/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.util.
Collections;
import java.util.
LinkedList;
import java.util.
List;
import javax.jms.
ConnectionConsumer;
import javax.jms.
IllegalStateException;
import javax.jms.
JMSException;
import javax.jms.
ServerSession;
import javax.jms.
ServerSessionPool;
import javax.jms.
Session;
import org.apache.activemq.command.
ConsumerId;
import org.apache.activemq.command.
ConsumerInfo;
import org.apache.activemq.command.
MessageDispatch;
/**
* For application servers, <CODE>Connection</CODE> objects provide a special
* facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The
* messages it is to consume are specified by a <CODE>Destination</CODE> and a
* message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be
* given a <CODE>ServerSessionPool</CODE> to use for processing its messages.
* <p/>
* <P>
* Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a
* <CODE>ServerSession</CODE> from its pool, loads it with a single message,
* and starts it. As traffic picks up, messages can back up. If this happens, a
* <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE>
* with more than one message. This reduces the thread context switches and
* minimizes resource use at the expense of some serialization of message
* processing.
*
* @see javax.jms.Connection#createConnectionConsumer
* @see javax.jms.Connection#createDurableConnectionConsumer
* @see javax.jms.QueueConnection#createConnectionConsumer
* @see javax.jms.TopicConnection#createConnectionConsumer
* @see javax.jms.TopicConnection#createDurableConnectionConsumer
*/
public class
ActiveMQConnectionConsumer implements
ConnectionConsumer,
ActiveMQDispatcher {
private
ActiveMQConnection connection;
private
ServerSessionPool sessionPool;
private
ConsumerInfo consumerInfo;
private boolean
closed;
/**
* Create a ConnectionConsumer
*
* @param theConnection
* @param theSessionPool
* @param theConsumerInfo
* @throws JMSException
*/
protected
ActiveMQConnectionConsumer(
ActiveMQConnection theConnection,
ServerSessionPool theSessionPool,
ConsumerInfo theConsumerInfo) throws
JMSException {
this.
connection =
theConnection;
this.
sessionPool =
theSessionPool;
this.
consumerInfo =
theConsumerInfo;
this.
connection.
addConnectionConsumer(this);
this.
connection.
addDispatcher(
consumerInfo.
getConsumerId(), this);
this.
connection.
syncSendPacket(this.
consumerInfo);
}
/**
* Gets the server session pool associated with this connection consumer.
*
* @return the server session pool used by this connection consumer
* @throws JMSException if the JMS provider fails to get the server session
* pool associated with this consumer due to some internal
* error.
*/
public
ServerSessionPool getServerSessionPool() throws
JMSException {
if (
closed) {
throw new
IllegalStateException("The Connection Consumer is closed");
}
return this.
sessionPool;
}
/**
* Closes the connection consumer. <p/>
* <P>
* Since a provider may allocate some resources on behalf of a connection
* consumer outside the Java virtual machine, clients should close these
* resources when they are not needed. Relying on garbage collection to
* eventually reclaim these resources may not be timely enough.
*
* @throws JMSException
*/
public void
close() throws
JMSException {
if (!
closed) {
dispose();
this.
connection.
asyncSendPacket(this.
consumerInfo.
createRemoveCommand());
}
}
public void
dispose() {
if (!
closed) {
this.
connection.
removeDispatcher(
consumerInfo.
getConsumerId());
this.
connection.
removeConnectionConsumer(this);
closed = true;
}
}
public void
dispatch(
MessageDispatch messageDispatch) {
try {
messageDispatch.
setConsumer(this);
ServerSession serverSession =
sessionPool.
getServerSession();
Session s =
serverSession.
getSession();
ActiveMQSession session = null;
if (
s instanceof
ActiveMQSession) {
session = (
ActiveMQSession)
s;
} else if (
s instanceof
ActiveMQTopicSession) {
ActiveMQTopicSession topicSession = (
ActiveMQTopicSession)
s;
session = (
ActiveMQSession)
topicSession.
getNext();
} else if (
s instanceof
ActiveMQQueueSession) {
ActiveMQQueueSession queueSession = (
ActiveMQQueueSession)
s;
session = (
ActiveMQSession)
queueSession.
getNext();
} else {
connection.
onClientInternalException(new
JMSException("Session pool provided an invalid session type: " +
s.
getClass()));
return;
}
session.
dispatch(
messageDispatch);
serverSession.
start();
} catch (
JMSException e) {
connection.
onAsyncException(
e);
}
}
public
String toString() {
return "ActiveMQConnectionConsumer { value=" +
consumerInfo.
getConsumerId() + " }";
}
public void
clearMessagesInProgress() {
// future: may want to deal with rollback of in progress messages to track re deliveries
// before indicating that all is complete.
// Till there is a need, lets immediately allow dispatch
this.
connection.
transportInterruptionProcessingComplete();
}
public
ConsumerInfo getConsumerInfo() {
return
consumerInfo;
}
}