badDataListeners;
private boolean metaInfoRequired;
/**
* Whether to disregard bundle time-stamps for dispatch-scheduling.
@@ -149,8 +143,8 @@ public OSCPacketDispatcher(
this.typeTagsCharset = (propertiesCharset == null)
? Charset.defaultCharset()
: propertiesCharset;
- this.selectiveMessageListeners = new ArrayList<>();
- this.badDataListeners = new ArrayList<>();
+ this.selectiveMessageListeners = new ConcurrentLinkedQueue<>();
+ this.badDataListeners = new ConcurrentLinkedQueue<>();
this.metaInfoRequired = false;
this.alwaysDispatchingImmediately = false;
this.dispatchScheduler = dispatchScheduler;
diff --git a/modules/core/src/main/java/com/illposed/osc/transport/channel/OSCDatagramChannel.java b/modules/core/src/main/java/com/illposed/osc/transport/channel/OSCDatagramChannel.java
index 1d5ee59e..d54fa93e 100644
--- a/modules/core/src/main/java/com/illposed/osc/transport/channel/OSCDatagramChannel.java
+++ b/modules/core/src/main/java/com/illposed/osc/transport/channel/OSCDatagramChannel.java
@@ -37,30 +37,54 @@
*/
public class OSCDatagramChannel extends SelectableChannel {
- private final DatagramChannel underlyingChannel;
- private final OSCParser parser;
- private final OSCSerializerAndParserBuilder serializerBuilder;
-
- public OSCDatagramChannel(
- final DatagramChannel underlyingChannel,
- final OSCSerializerAndParserBuilder serializerAndParserBuilder
- )
- {
- this.underlyingChannel = underlyingChannel;
- OSCParser tmpParser = null;
- if (serializerAndParserBuilder != null) {
- tmpParser = serializerAndParserBuilder.buildParser();
- }
- this.parser = tmpParser;
- this.serializerBuilder = serializerAndParserBuilder;
- }
-
- public OSCPacket read(final ByteBuffer buffer) throws IOException, OSCParseException {
-
- boolean completed = false;
- OSCPacket oscPacket;
- try {
- begin();
+ private final DatagramChannel underlyingChannel;
+ private final OSCParser parser;
+ private final OSCSerializerAndParserBuilder serializerBuilder;
+
+ public static class OSCPacketWithSource {
+ private OSCPacket packet;
+ private SocketAddress source;
+
+ public OSCPacketWithSource(OSCPacket packet, SocketAddress source) {
+ this.packet = packet;
+ this.source = source;
+ }
+
+ public OSCPacket getPacket() {
+ return packet;
+ }
+
+ public SocketAddress getSource() {
+ return source;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return ((obj instanceof OSCPacketWithSource))
+ && (((OSCPacketWithSource) obj).packet == packet
+ && ((OSCPacketWithSource) obj).source == source);
+ }
+ }
+
+ public OSCDatagramChannel(
+ final DatagramChannel underlyingChannel,
+ final OSCSerializerAndParserBuilder serializerAndParserBuilder
+ ) {
+ this.underlyingChannel = underlyingChannel;
+ OSCParser tmpParser = null;
+ if (serializerAndParserBuilder != null) {
+ tmpParser = serializerAndParserBuilder.buildParser();
+ }
+ this.parser = tmpParser;
+ this.serializerBuilder = serializerAndParserBuilder;
+ }
+
+ public OSCPacketWithSource read(final ByteBuffer buffer) throws IOException, OSCParseException {
+ boolean completed = false;
+ OSCPacket oscPacket;
+ SocketAddress peer;
+ try {
+ begin();
buffer.clear();
// NOTE From the doc of `read()` and `receive()`:
@@ -68,109 +92,111 @@ public OSCPacket read(final ByteBuffer buffer) throws IOException, OSCParseExcep
// than are required to hold the datagram
// then the remainder of the datagram is silently discarded."
if (underlyingChannel.isConnected()) {
+ peer = underlyingChannel.getRemoteAddress();
underlyingChannel.read(buffer);
} else {
- underlyingChannel.receive(buffer);
+ peer = underlyingChannel.receive(buffer);
}
// final int readBytes = buffer.position();
// if (readBytes == buffer.capacity()) {
// // TODO In this case it is very likely that the buffer was actually too small, and the remainder of the datagram/packet was silently discarded. We might want to give a warning, like throw an exception in this case, but whether this happens should probably be user configurable.
// }
- buffer.flip();
- if (buffer.limit() == 0) {
- throw new OSCParseException("Received a packet without any data");
- } else {
- oscPacket = parser.convert(buffer);
- completed = true;
- }
- } finally {
- end(completed);
- }
-
- return oscPacket;
- }
-
- public void send(final ByteBuffer buffer, final OSCPacket packet, final SocketAddress remoteAddress) throws IOException, OSCSerializeException {
-
- boolean completed = false;
- try {
- begin();
-
- final OSCSerializer serializer = serializerBuilder.buildSerializer(buffer);
- buffer.rewind();
- serializer.write(packet);
- buffer.flip();
- if (underlyingChannel.isConnected()) {
- underlyingChannel.write(buffer);
- } else if (remoteAddress == null) {
- throw new IllegalStateException("Not connected and no remote address is given");
- } else {
- underlyingChannel.send(buffer, remoteAddress);
- }
- completed = true;
- } finally {
- end(completed);
- }
- }
-
- public void write(final ByteBuffer buffer, final OSCPacket packet) throws IOException, OSCSerializeException {
-
- boolean completed = false;
- try {
- begin();
- if (!underlyingChannel.isConnected()) {
- throw new IllegalStateException("Either connect the channel or use write()");
- }
- send(buffer, packet, null);
- completed = true;
- } finally {
- end(completed);
- }
- }
-
- @Override
- public SelectorProvider provider() {
- return underlyingChannel.provider();
- }
-
- @Override
- public boolean isRegistered() {
- return underlyingChannel.isRegistered();
- }
-
- @Override
- public SelectionKey keyFor(final Selector sel) {
- return underlyingChannel.keyFor(sel);
- }
-
- @Override
- public SelectionKey register(final Selector sel, final int ops, final Object att) throws ClosedChannelException {
- return underlyingChannel.register(sel, ops, att);
- }
-
- @Override
- public SelectableChannel configureBlocking(final boolean block) throws IOException {
- return underlyingChannel.configureBlocking(block);
- }
-
- @Override
- public boolean isBlocking() {
- return underlyingChannel.isBlocking();
- }
-
- @Override
- public Object blockingLock() {
- return underlyingChannel.blockingLock();
- }
-
- @Override
- protected void implCloseChannel() throws IOException {
- // XXX is this ok?
- underlyingChannel.close();
- }
-
- @Override
- public int validOps() {
- return underlyingChannel.validOps();
- }
+ buffer.flip();
+ if (buffer.limit() == 0) {
+ throw new OSCParseException("Received a packet without any data");
+ } else {
+ oscPacket = parser.convert(buffer);
+ completed = true;
+ }
+ } finally {
+ end(completed);
+ }
+
+ return new OSCPacketWithSource(oscPacket, peer);
+ }
+
+
+ public void send(final ByteBuffer buffer, final OSCPacket packet, final SocketAddress remoteAddress) throws IOException, OSCSerializeException {
+
+ boolean completed = false;
+ try {
+ begin();
+
+ final OSCSerializer serializer = serializerBuilder.buildSerializer(buffer);
+ buffer.rewind();
+ serializer.write(packet);
+ buffer.flip();
+ if (underlyingChannel.isConnected()) {
+ underlyingChannel.write(buffer);
+ } else if (remoteAddress == null) {
+ throw new IllegalStateException("Not connected and no remote address is given");
+ } else {
+ underlyingChannel.send(buffer, remoteAddress);
+ }
+ completed = true;
+ } finally {
+ end(completed);
+ }
+ }
+
+ public void write(final ByteBuffer buffer, final OSCPacket packet) throws IOException, OSCSerializeException {
+
+ boolean completed = false;
+ try {
+ begin();
+ if (!underlyingChannel.isConnected()) {
+ throw new IllegalStateException("Either connect the channel or use write()");
+ }
+ send(buffer, packet, null);
+ completed = true;
+ } finally {
+ end(completed);
+ }
+ }
+
+ @Override
+ public SelectorProvider provider() {
+ return underlyingChannel.provider();
+ }
+
+ @Override
+ public boolean isRegistered() {
+ return underlyingChannel.isRegistered();
+ }
+
+ @Override
+ public SelectionKey keyFor(final Selector sel) {
+ return underlyingChannel.keyFor(sel);
+ }
+
+ @Override
+ public SelectionKey register(final Selector sel, final int ops, final Object att) throws ClosedChannelException {
+ return underlyingChannel.register(sel, ops, att);
+ }
+
+ @Override
+ public SelectableChannel configureBlocking(final boolean block) throws IOException {
+ return underlyingChannel.configureBlocking(block);
+ }
+
+ @Override
+ public boolean isBlocking() {
+ return underlyingChannel.isBlocking();
+ }
+
+ @Override
+ public Object blockingLock() {
+ return underlyingChannel.blockingLock();
+ }
+
+ @Override
+ protected void implCloseChannel() throws IOException {
+ // XXX is this ok?
+ underlyingChannel.close();
+ }
+
+ @Override
+ public int validOps() {
+ return underlyingChannel.validOps();
+ }
}
diff --git a/modules/core/src/main/java/com/illposed/osc/transport/udp/OSCPortIn.java b/modules/core/src/main/java/com/illposed/osc/transport/udp/OSCPortIn.java
index 1713009b..e6d57e0f 100644
--- a/modules/core/src/main/java/com/illposed/osc/transport/udp/OSCPortIn.java
+++ b/modules/core/src/main/java/com/illposed/osc/transport/udp/OSCPortIn.java
@@ -19,6 +19,7 @@
import com.illposed.osc.transport.channel.OSCDatagramChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -29,7 +30,7 @@
/**
* Listens for OSC packets on a UDP/IP port.
- *
+ *
* An example:
*
{@code
* // listens on the wildcard address (all local network interfaces)
@@ -49,367 +50,399 @@
* //receiver.getDispatcher().setAlwaysDispatchingImmediately(true);
* receiver.startListening();
* }
- *
+ *
* Then, using a program such as SuperCollider or sendOSC, send a message
* to this computer, port {@link #DEFAULT_SC_OSC_PORT},
* with the address "/message/receiving".
*/
public class OSCPortIn extends OSCPort implements Runnable {
- private final Logger log = LoggerFactory.getLogger(OSCPortIn.class);
-
- // Public API
- /**
- * Buffers were 1500 bytes in size, but were increased to 1536, as this is a common MTU,
- * and then increased to 65507, as this is the maximum incoming datagram data size.
- */
- @SuppressWarnings("WeakerAccess")
- public static final int BUFFER_SIZE = 65507;
-
- private volatile boolean listening;
- private boolean daemonListener;
- private boolean resilient;
- private Thread listeningThread;
- private final OSCSerializerAndParserBuilder parserBuilder;
- private final List packetListeners;
-
- public static OSCPacketDispatcher getDispatcher(
- final List listeners)
- {
- return listeners.stream()
- .filter(OSCPacketDispatcher.class::isInstance)
- .findFirst()
- .map(OSCPacketDispatcher.class::cast)
- .orElse(null);
- }
-
- public static OSCPacketListener defaultPacketListener() {
- final OSCPacketDispatcher dispatcher = new OSCPacketDispatcher();
- // Until version 0.4, we did the following property to true
,
- // because this is how it always worked in this library until Feb. 2015.,
- // and thus users of this library expected this behaviour by default.
- // It is against the OSC (1.0) specification though,
- // so since version 0.5 of this library, we set it to false
.
- dispatcher.setAlwaysDispatchingImmediately(false);
-
- return dispatcher;
- }
-
- public static List defaultPacketListeners() {
- final List listeners = new ArrayList<>();
- listeners.add(defaultPacketListener());
- return listeners;
- }
-
- /**
- * Create an OSC-Port that listens on the given local socket for packets from {@code remote},
- * using a parser created with the given factory,
- * and with {@link #isResilient() resilient} set to true.
- * @param parserBuilder to create the internal parser from
- * @param packetListeners to handle received and serialized OSC packets
- * @param local address to listen on
- * @param remote address to listen to
- * @throws IOException if we fail to bind a channel to the local address
- */
- public OSCPortIn(
- final OSCSerializerAndParserBuilder parserBuilder,
- final List packetListeners,
- final SocketAddress local,
- final SocketAddress remote)
- throws IOException
- {
- super(local, remote);
-
- this.listening = false;
- this.daemonListener = true;
- this.resilient = true;
- this.parserBuilder = parserBuilder;
- this.packetListeners = packetListeners;
- }
-
- public OSCPortIn(
- final OSCSerializerAndParserBuilder parserBuilder,
- final List packetListeners,
- final SocketAddress local)
- throws IOException
- {
- this(
- parserBuilder,
- packetListeners,
- local,
- new InetSocketAddress(OSCPort.generateWildcard(local), 0)
- );
- }
-
- public OSCPortIn(
- final OSCSerializerAndParserBuilder parserBuilder,
- final SocketAddress local)
- throws IOException
- {
- this(parserBuilder, defaultPacketListeners(), local);
- }
-
- public OSCPortIn(final OSCSerializerAndParserBuilder parserBuilder, final int port)
- throws IOException
- {
- this(parserBuilder, new InetSocketAddress(port));
- }
-
- /**
- * Creates an OSC-Port that listens on the given local socket.
- * @param local address to listen on
- * @throws IOException if we fail to bind a channel to the local address
- */
- public OSCPortIn(final SocketAddress local) throws IOException {
- this(new OSCSerializerAndParserBuilder(), local);
- }
-
- /**
- * Creates an OSC-Port that listens on the wildcard address
- * (all local network interfaces) on the specified local port.
- * @param port port number to listen on
- * @throws IOException if we fail to bind a channel to the local address
- */
- public OSCPortIn(final int port) throws IOException {
- this(new InetSocketAddress(port));
- }
-
- /**
- * Creates an OSC-Port that listens on the wildcard address
- * (all local network interfaces) on the default local port
- * {@link #DEFAULT_SC_OSC_PORT}.
- * @throws IOException if we fail to bind a channel to the local address
- */
- public OSCPortIn() throws IOException {
- this(defaultSCOSCPort());
- }
-
- /**
- * Run the loop that listens for OSC on a socket until
- * {@link #isListening()} becomes false.
- * @see Runnable#run()
- */
- @Override
- public void run() {
-
- final ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
- final DatagramChannel channel = getChannel();
- final OSCDatagramChannel oscChannel = new OSCDatagramChannel(channel, parserBuilder);
- while (listening) {
- try {
- final OSCPacket oscPacket = oscChannel.read(buffer);
-
- final OSCPacketEvent event = new OSCPacketEvent(this, oscPacket);
- for (final OSCPacketListener listener : packetListeners) {
- listener.handlePacket(event);
- }
- } catch (final IOException ex) {
- if (isListening()) {
- stopListening(ex);
- } else {
- stopListening();
- }
- } catch (final OSCParseException ex) {
- badPacketReceived(ex, buffer);
- }
- }
- }
-
- private void stopListening(final Exception exception) {
-
- // TODO This implies a low-level problem (for example network IO), but it could just aswell be a high-level one (for example a parse exception)
- final String errorMsg = "Error while listening on " + toString() + "...";
- log.error(errorMsg);
- if (exception instanceof OSCParseException) {
- log.error(errorMsg);
- } else {
- log.error(errorMsg, exception);
- }
- stopListening();
- }
-
- private void badPacketReceived(final OSCParseException exception, final ByteBuffer data) {
-
- final OSCBadDataEvent badDataEvt = new OSCBadDataEvent(this, data, exception);
-
- for (final OSCPacketListener listener : packetListeners) {
- listener.handleBadData(badDataEvt);
- }
-
- if (!isResilient()) {
- stopListening(exception);
- }
- }
-
- // Public API
- /**
- * Start listening for incoming OSCPackets
- */
- @SuppressWarnings("WeakerAccess")
- public void startListening() {
-
- // NOTE This is not thread-save
- if (!isListening()) {
- listening = true;
- listeningThread = new Thread(this);
- // The JVM exits when the only threads running are all daemon threads.
- listeningThread.setDaemon(daemonListener);
- listeningThread.start();
- }
- }
-
- // Public API
- /**
- * Stop listening for incoming OSCPackets
- */
- @SuppressWarnings("WeakerAccess")
- public void stopListening() {
-
- listening = false;
- // NOTE This is not thread-save
- if (getChannel().isBlocking()) {
- try {
- getChannel().close();
- } catch (final IOException ex) {
- log.error("Failed to close OSC UDP channel", ex);
- }
- }
- }
-
- // Public API
- /**
- * Is this port listening for packets?
- * @return true if this port is in listening mode
- */
- @SuppressWarnings("WeakerAccess")
- public boolean isListening() {
- return listening;
- }
-
- // Public API
- /**
- * Is this port listening for packets in daemon mode?
- * @see #setDaemonListener
- * @return true
if this ports listening thread is/would be in daemon mode
- */
- @SuppressWarnings({"WeakerAccess", "unused"})
- public boolean isDaemonListener() {
- return daemonListener;
- }
-
- // Public API
- /**
- * Set whether this port should be listening for packets in daemon mode.
- * The Java Virtual Machine exits when the only threads running are all daemon threads.
- * This is true
by default.
- * Probably the only feasible reason to set this to false
,
- * is if the code in the listener is very small,
- * and the application consists of nothing more then this listening thread.
- * @see Thread#setDaemon(boolean)
- * @param daemonListener whether this ports listening thread should be in daemon mode
- */
- @SuppressWarnings("WeakerAccess")
- public void setDaemonListener(final boolean daemonListener) {
-
- if (isListening()) {
- listeningThread.setDaemon(daemonListener);
- }
- this.daemonListener = daemonListener;
- }
-
- // Public API
- /**
- * Whether this port continues listening and throws
- * a {@link OSCParseException} after receiving a bad packet.
- * @return true
if this port will continue listening
- * after a parse exception
- */
- @SuppressWarnings("WeakerAccess")
- public boolean isResilient() {
- return resilient;
- }
-
- // Public API
- /**
- * Set whether this port continues listening and throws
- * a {@link OSCParseException} after receiving a bad packet.
- * @param resilient whether this port should continue listening
- * after a parse exception
- */
- @SuppressWarnings("WeakerAccess")
- public void setResilient(final boolean resilient) {
- this.resilient = resilient;
- }
-
- @Override
- public void close() throws IOException {
-
- if (isListening()) {
- stopListening();
- }
- super.close();
- }
-
- @Override
- public String toString() {
-
- final StringBuilder rep = new StringBuilder(32);
-
- rep
- .append('[')
- .append(getClass().getSimpleName())
- .append(": ");
- if (isListening()) {
- rep
- .append("listening on \"")
- .append(getLocalAddress().toString())
- .append('\"');
- } else {
- rep.append("stopped");
- }
- rep.append(']');
-
- return rep.toString();
- }
-
- // Public API
- @SuppressWarnings("WeakerAccess")
- public OSCPacketDispatcher getDispatcher() {
- final OSCPacketDispatcher dispatcher = getDispatcher(packetListeners);
-
- if (dispatcher == null) {
- throw new IllegalStateException(
- "OSCPortIn packet listeners do not include a dispatcher.");
- }
-
- return dispatcher;
- }
-
- public List getPacketListeners() {
- return packetListeners;
- }
-
- /**
- * Adds a listener that will handle all packets received.
- * This includes bundles and individual (non-bundled) messages.
- * Registered listeners will be notified of packets in the order they were
- * added to the dispatcher.
- * A listener can be registered multiple times, and will consequently be
- * notified as many times as it was added.
- * @param listener receives and handles packets
- */
- public void addPacketListener(final OSCPacketListener listener) {
- packetListeners.add(listener);
- }
-
- /**
- * Removes a packet listener, which will no longer be notified of incoming
- * packets.
- * Removes only the first occurrence of the listener.
- * @param listener will no longer receive packets
- */
- public void removePacketListener(final OSCPacketListener listener) {
- packetListeners.remove(listener);
- }
+ private final Logger log = LoggerFactory.getLogger(OSCPortIn.class);
+
+ // Public API
+ /**
+ * Buffers were 1500 bytes in size, but were increased to 1536, as this is a common MTU,
+ * and then increased to 65507, as this is the maximum incoming datagram data size.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final int BUFFER_SIZE = 65507;
+
+ private volatile boolean listening;
+ private boolean daemonListener;
+ private boolean resilient;
+ private Thread listeningThread;
+ private final OSCSerializerAndParserBuilder parserBuilder;
+ private final List packetListeners;
+
+ public static class OSCPortInSource {
+ private OSCPortIn port;
+ private SocketAddress sender;
+
+ OSCPortInSource(OSCPortIn port, SocketAddress sender) {
+ this.port = port;
+ this.sender = sender;
+ }
+
+ public OSCPortIn getPort() {
+ return port;
+ }
+
+ public SocketAddress getSender() {
+ return sender;
+ }
+ }
+
+ public static OSCPacketDispatcher getDispatcher(
+ final List listeners) {
+ return listeners.stream()
+ .filter(OSCPacketDispatcher.class::isInstance)
+ .findFirst()
+ .map(OSCPacketDispatcher.class::cast)
+ .orElse(null);
+ }
+
+ public static OSCPacketListener defaultPacketListener() {
+ final OSCPacketDispatcher dispatcher = new OSCPacketDispatcher();
+ // Until version 0.4, we did the following property to true
,
+ // because this is how it always worked in this library until Feb. 2015.,
+ // and thus users of this library expected this behaviour by default.
+ // It is against the OSC (1.0) specification though,
+ // so since version 0.5 of this library, we set it to false
.
+ dispatcher.setAlwaysDispatchingImmediately(false);
+
+ return dispatcher;
+ }
+
+ public static List defaultPacketListeners() {
+ final List listeners = new ArrayList<>();
+ listeners.add(defaultPacketListener());
+ return listeners;
+ }
+
+ /**
+ * Create an OSC-Port that listens on the given local socket for packets from {@code remote},
+ * using a parser created with the given factory,
+ * and with {@link #isResilient() resilient} set to true.
+ *
+ * @param parserBuilder to create the internal parser from
+ * @param packetListeners to handle received and serialized OSC packets
+ * @param local address to listen on
+ * @param remote address to listen to
+ * @throws IOException if we fail to bind a channel to the local address
+ */
+ public OSCPortIn(
+ final OSCSerializerAndParserBuilder parserBuilder,
+ final List packetListeners,
+ final SocketAddress local,
+ final SocketAddress remote)
+ throws IOException {
+ super(local, remote);
+
+ this.listening = false;
+ this.daemonListener = true;
+ this.resilient = true;
+ this.parserBuilder = parserBuilder;
+ this.packetListeners = packetListeners;
+ }
+
+ public OSCPortIn(
+ final OSCSerializerAndParserBuilder parserBuilder,
+ final List packetListeners,
+ final SocketAddress local)
+ throws IOException {
+ this(
+ parserBuilder,
+ packetListeners,
+ local,
+ new InetSocketAddress(OSCPort.generateWildcard(local), 0)
+ );
+ }
+
+ public OSCPortIn(
+ final OSCSerializerAndParserBuilder parserBuilder,
+ final SocketAddress local)
+ throws IOException {
+ this(parserBuilder, defaultPacketListeners(), local);
+ }
+
+ public OSCPortIn(final OSCSerializerAndParserBuilder parserBuilder, final int port)
+ throws IOException {
+ this(parserBuilder, new InetSocketAddress(port));
+ }
+
+ /**
+ * Creates an OSC-Port that listens on the given local socket.
+ *
+ * @param local address to listen on
+ * @throws IOException if we fail to bind a channel to the local address
+ */
+ public OSCPortIn(final SocketAddress local) throws IOException {
+ this(new OSCSerializerAndParserBuilder(), local);
+ }
+
+ /**
+ * Creates an OSC-Port that listens on the wildcard address
+ * (all local network interfaces) on the specified local port.
+ *
+ * @param port port number to listen on
+ * @throws IOException if we fail to bind a channel to the local address
+ */
+ public OSCPortIn(final int port) throws IOException {
+ this(new InetSocketAddress(port));
+ }
+
+ /**
+ * Creates an OSC-Port that listens on the wildcard address
+ * (all local network interfaces) on the default local port
+ * {@link #DEFAULT_SC_OSC_PORT}.
+ *
+ * @throws IOException if we fail to bind a channel to the local address
+ */
+ public OSCPortIn() throws IOException {
+ this(defaultSCOSCPort());
+ }
+
+ /**
+ * Run the loop that listens for OSC on a socket until
+ * {@link #isListening()} becomes false.
+ *
+ * @see Runnable#run()
+ */
+ @Override
+ public void run() {
+
+ final ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+ final DatagramChannel channel = getChannel();
+ final OSCDatagramChannel oscChannel = new OSCDatagramChannel(channel, parserBuilder);
+ while (listening) {
+ try {
+ final OSCDatagramChannel.OSCPacketWithSource packetWithSource = oscChannel.read(buffer);
+
+ final OSCPacketEvent event = new OSCPacketEvent(new OSCPortInSource(this, packetWithSource.getSource()), packetWithSource.getPacket());
+ for (final OSCPacketListener listener : packetListeners) {
+ listener.handlePacket(event);
+ }
+ } catch (final IOException ex) {
+ if (isListening()) {
+ stopListening(ex);
+ } else {
+ stopListening();
+ }
+ } catch (final OSCParseException ex) {
+ badPacketReceived(ex, buffer);
+ }
+ }
+ }
+
+ private void stopListening(final Exception exception) {
+
+ // TODO This implies a low-level problem (for example network IO), but it could just aswell be a high-level one (for example a parse exception)
+ final String errorMsg = "Error while listening on " + toString() + "...";
+ log.error(errorMsg);
+ if (exception instanceof OSCParseException) {
+ log.error(errorMsg);
+ } else {
+ log.error(errorMsg, exception);
+ }
+ stopListening();
+ }
+
+ private void badPacketReceived(final OSCParseException exception, final ByteBuffer data) {
+
+ final OSCBadDataEvent badDataEvt = new OSCBadDataEvent(this, data, exception);
+
+ for (final OSCPacketListener listener : packetListeners) {
+ listener.handleBadData(badDataEvt);
+ }
+
+ if (!isResilient()) {
+ stopListening(exception);
+ }
+ }
+
+ // Public API
+
+ /**
+ * Start listening for incoming OSCPackets
+ */
+ @SuppressWarnings("WeakerAccess")
+ public void startListening() {
+
+ // NOTE This is not thread-save
+ if (!isListening()) {
+ listening = true;
+ listeningThread = new Thread(this);
+ // The JVM exits when the only threads running are all daemon threads.
+ listeningThread.setDaemon(daemonListener);
+ listeningThread.start();
+ }
+ }
+
+ // Public API
+
+ /**
+ * Stop listening for incoming OSCPackets
+ */
+ @SuppressWarnings("WeakerAccess")
+ public void stopListening() {
+
+ listening = false;
+ // NOTE This is not thread-save
+ if (getChannel().isBlocking()) {
+ try {
+ getChannel().close();
+ } catch (final IOException ex) {
+ log.error("Failed to close OSC UDP channel", ex);
+ }
+ }
+ }
+
+ // Public API
+
+ /**
+ * Is this port listening for packets?
+ *
+ * @return true if this port is in listening mode
+ */
+ @SuppressWarnings("WeakerAccess")
+ public boolean isListening() {
+ return listening;
+ }
+
+ // Public API
+
+ /**
+ * Is this port listening for packets in daemon mode?
+ *
+ * @return true
if this ports listening thread is/would be in daemon mode
+ * @see #setDaemonListener
+ */
+ @SuppressWarnings({"WeakerAccess", "unused"})
+ public boolean isDaemonListener() {
+ return daemonListener;
+ }
+
+ // Public API
+
+ /**
+ * Set whether this port should be listening for packets in daemon mode.
+ * The Java Virtual Machine exits when the only threads running are all daemon threads.
+ * This is true
by default.
+ * Probably the only feasible reason to set this to false
,
+ * is if the code in the listener is very small,
+ * and the application consists of nothing more then this listening thread.
+ *
+ * @param daemonListener whether this ports listening thread should be in daemon mode
+ * @see Thread#setDaemon(boolean)
+ */
+ @SuppressWarnings("WeakerAccess")
+ public void setDaemonListener(final boolean daemonListener) {
+
+ if (isListening()) {
+ listeningThread.setDaemon(daemonListener);
+ }
+ this.daemonListener = daemonListener;
+ }
+
+ // Public API
+
+ /**
+ * Whether this port continues listening and throws
+ * a {@link OSCParseException} after receiving a bad packet.
+ *
+ * @return true
if this port will continue listening
+ * after a parse exception
+ */
+ @SuppressWarnings("WeakerAccess")
+ public boolean isResilient() {
+ return resilient;
+ }
+
+ // Public API
+
+ /**
+ * Set whether this port continues listening and throws
+ * a {@link OSCParseException} after receiving a bad packet.
+ *
+ * @param resilient whether this port should continue listening
+ * after a parse exception
+ */
+ @SuppressWarnings("WeakerAccess")
+ public void setResilient(final boolean resilient) {
+ this.resilient = resilient;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ if (isListening()) {
+ stopListening();
+ }
+ super.close();
+ }
+
+ @Override
+ public String toString() {
+
+ final StringBuilder rep = new StringBuilder(32);
+
+ rep
+ .append('[')
+ .append(getClass().getSimpleName())
+ .append(": ");
+ if (isListening()) {
+ rep
+ .append("listening on \"")
+ .append(getLocalAddress().toString())
+ .append('\"');
+ } else {
+ rep.append("stopped");
+ }
+ rep.append(']');
+
+ return rep.toString();
+ }
+
+ // Public API
+ @SuppressWarnings("WeakerAccess")
+ public OSCPacketDispatcher getDispatcher() {
+ final OSCPacketDispatcher dispatcher = getDispatcher(packetListeners);
+
+ if (dispatcher == null) {
+ throw new IllegalStateException(
+ "OSCPortIn packet listeners do not include a dispatcher.");
+ }
+
+ return dispatcher;
+ }
+
+ public List getPacketListeners() {
+ return packetListeners;
+ }
+
+ /**
+ * Adds a listener that will handle all packets received.
+ * This includes bundles and individual (non-bundled) messages.
+ * Registered listeners will be notified of packets in the order they were
+ * added to the dispatcher.
+ * A listener can be registered multiple times, and will consequently be
+ * notified as many times as it was added.
+ *
+ * @param listener receives and handles packets
+ */
+ public void addPacketListener(final OSCPacketListener listener) {
+ packetListeners.add(listener);
+ }
+
+ /**
+ * Removes a packet listener, which will no longer be notified of incoming
+ * packets.
+ * Removes only the first occurrence of the listener.
+ *
+ * @param listener will no longer receive packets
+ */
+ public void removePacketListener(final OSCPacketListener listener) {
+ packetListeners.remove(listener);
+ }
}