// The contents of this file are subject to the Mozilla Public License
// Version 1.1 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License
// at http://www.mozilla.org/MPL/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and
// limitations under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is GoPivotal, Inc.
// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
//
package com.rabbitmq.client.impl;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Command;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.FlowListener;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.Method;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.UnexpectedMethodError;
import com.rabbitmq.client.impl.AMQImpl.Basic;
import com.rabbitmq.client.impl.AMQImpl.Channel;
import com.rabbitmq.client.impl.AMQImpl.Confirm;
import com.rabbitmq.client.impl.AMQImpl.Exchange;
import com.rabbitmq.client.impl.AMQImpl.Queue;
import com.rabbitmq.client.impl.AMQImpl.Tx;
import com.rabbitmq.utility.Utility;
/**
* Main interface to AMQP protocol functionality. Public API -
* Implementation of all AMQChannels except channel zero.
* <p>
* To open a channel,
* <pre>
* {@link Connection} conn = ...;
* {@link ChannelN} ch1 = conn.{@link Connection#createChannel createChannel}();
* </pre>
*/
public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel {
private static final String UNSPECIFIED_OUT_OF_BAND = "";
/** Map from consumer tag to {@link Consumer} instance.
* <p/>
* Note that, in general, this map should ONLY ever be accessed
* from the connection's reader thread. We go to some pains to
* ensure this is the case - see the use of
* BlockingRpcContinuation to inject code into the reader thread
* in basicConsume and basicCancel.
*/
private final Map<String, Consumer> _consumers =
Collections.synchronizedMap(new HashMap<String, Consumer>());
/* All listeners collections are in CopyOnWriteArrayList objects */
/** The ReturnListener collection. */
private final Collection<ReturnListener> returnListeners = new CopyOnWriteArrayList<ReturnListener>();
/** The FlowListener collection. */
private final Collection<FlowListener> flowListeners = new CopyOnWriteArrayList<FlowListener>();
/** The ConfirmListener collection. */
private final Collection<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<ConfirmListener>();
/** Sequence number of next published message requiring confirmation.*/
private long nextPublishSeqNo = 0L;
/** The current default consumer, or null if there is none. */
private volatile Consumer defaultConsumer = null;
/** Dispatcher of consumer work for this channel */
private final ConsumerDispatcher dispatcher;
/** Future boolean for shutting down */
private volatile CountDownLatch finishedShutdownFlag = null;
/** Set of currently unconfirmed messages (i.e. messages that have
* not been ack'd or nack'd by the server yet. */
private final SortedSet<Long> unconfirmedSet =
Collections.synchronizedSortedSet(new TreeSet<Long>());
/** Whether any nacks have been received since the last waitForConfirms(). */
private volatile boolean onlyAcksReceived = true;
/**
* Construct a new channel on the given connection with the given
* channel number. Usually not called directly - call
* Connection.createChannel instead.
* @see Connection#createChannel
* @param connection The connection associated with this channel
* @param channelNumber The channel number to be associated with this channel
* @param workService service for managing this channel's consumer callbacks
*/
public ChannelN(AMQConnection connection, int channelNumber,
ConsumerWorkService workService) {
super(connection, channelNumber);
this.dispatcher = new ConsumerDispatcher(connection, this, workService);
}
/**
* Package method: open the channel.
* This is only called from {@link ChannelManager}.
* @throws IOException if any problem is encountered
*/
public void open() throws IOException {
// wait for the Channel.OpenOk response, and ignore it
exnWrappingRpc(new Channel.Open(UNSPECIFIED_OUT_OF_BAND));
}
public void addReturnListener(ReturnListener listener) {
returnListeners.add(listener);
}
public boolean removeReturnListener(ReturnListener listener) {
return returnListeners.remove(listener);
}
public void clearReturnListeners() {
returnListeners.clear();
}
@SuppressWarnings("deprecation")
@Deprecated
public void addFlowListener(FlowListener listener) {
flowListeners.add(listener);
}
@SuppressWarnings("deprecation")
@Deprecated
public boolean removeFlowListener(FlowListener listener) {
return flowListeners.remove(listener);
}
@SuppressWarnings("deprecation")
@Deprecated
public void clearFlowListeners() {
flowListeners.clear();
}
public void addConfirmListener(ConfirmListener listener) {
confirmListeners.add(listener);
}
public boolean removeConfirmListener(ConfirmListener listener) {
return confirmListeners.remove(listener);
}
public void clearConfirmListeners() {
confirmListeners.clear();
}
/** {@inheritDoc} */
public boolean waitForConfirms()
throws InterruptedException
{
boolean confirms = false;
try {
confirms = waitForConfirms(0L);
} catch (TimeoutException e) { }
return confirms;
}
/** {@inheritDoc} */
public boolean waitForConfirms(long timeout)
throws InterruptedException, TimeoutException {
if (nextPublishSeqNo == 0L)
throw new IllegalStateException("Confirms not selected");
long startTime = System.currentTimeMillis();
synchronized (unconfirmedSet) {
while (true) {
if (getCloseReason() != null) {
throw Utility.fixStackTrace(getCloseReason());
}
if (unconfirmedSet.isEmpty()) {
boolean aux = onlyAcksReceived;
onlyAcksReceived = true;
return aux;
}
if (timeout == 0L) {
unconfirmedSet.wait();
} else {
long elapsed = System.currentTimeMillis() - startTime;
if (timeout > elapsed) {
unconfirmedSet.wait(timeout - elapsed);
} else {
throw new TimeoutException();
}
}
}
}
}
/** {@inheritDoc