/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.util.
HashMap;
import java.util.
Map;
import java.util.concurrent.atomic.
AtomicLong;
import javax.jms.
Destination;
import javax.jms.
IllegalStateException;
import javax.jms.
InvalidDestinationException;
import javax.jms.
JMSException;
import javax.jms.
Message;
import org.apache.activemq.command.
ActiveMQDestination;
import org.apache.activemq.command.
ProducerAck;
import org.apache.activemq.command.
ProducerId;
import org.apache.activemq.command.
ProducerInfo;
import org.apache.activemq.management.
JMSProducerStatsImpl;
import org.apache.activemq.management.
StatsCapable;
import org.apache.activemq.management.
StatsImpl;
import org.apache.activemq.usage.
MemoryUsage;
import org.apache.activemq.util.
IntrospectionSupport;
import org.slf4j.
Logger;
import org.slf4j.
LoggerFactory;
/**
* A client uses a <CODE>MessageProducer</CODE> object to send messages to a
* destination. A <CODE>MessageProducer</CODE> object is created by passing a
* <CODE>Destination</CODE> object to a message-producer creation method
* supplied by a session.
* <P>
* <CODE>MessageProducer</CODE> is the parent interface for all message
* producers.
* <P>
* A client also has the option of creating a message producer without supplying
* a destination. In this case, a destination must be provided with every send
* operation. A typical use for this kind of message producer is to send replies
* to requests using the request's <CODE>JMSReplyTo</CODE> destination.
* <P>
* A client can specify a default delivery mode, priority, and time to live for
* messages sent by a message producer. It can also specify the delivery mode,
* priority, and time to live for an individual message.
* <P>
* A client can specify a time-to-live value in milliseconds for each message it
* sends. This value defines a message expiration time that is the sum of the
* message's time-to-live and the GMT when it is sent (for transacted sends,
* this is the time the client sends the message, not the time the transaction
* is committed).
* <P>
* A JMS provider should do its best to expire messages accurately; however, the
* JMS API does not define the accuracy provided.
*
*
* @see javax.jms.TopicPublisher
* @see javax.jms.QueueSender
* @see javax.jms.Session#createProducer
*/
public class
ActiveMQMessageProducer extends
ActiveMQMessageProducerSupport implements
StatsCapable,
Disposable {
private static final
Logger LOG =
LoggerFactory.
getLogger(
ActiveMQMessageProducer.class);
protected
ProducerInfo info;
protected boolean
closed;
private final
JMSProducerStatsImpl stats;
private
AtomicLong messageSequence;
private final long
startTime;
private
MessageTransformer transformer;
private
MemoryUsage producerWindow;
protected
ActiveMQMessageProducer(
ActiveMQSession session,
ProducerId producerId,
ActiveMQDestination destination, int
sendTimeout) throws
JMSException {
super(
session);
this.
info = new
ProducerInfo(
producerId);
this.
info.
setWindowSize(
session.
connection.
getProducerWindowSize());
// Allows the options on the destination to configure the producerInfo
if (
destination != null &&
destination.
getOptions() != null) {
Map<
String,
Object>
options =
IntrospectionSupport.
extractProperties(
new
HashMap<
String,
Object>(
destination.
getOptions()), "producer.");
IntrospectionSupport.
setProperties(this.
info,
options);
if (
options.
size() > 0) {
String msg = "There are " +
options.
size()
+ " producer options that couldn't be set on the producer."
+ " Check the options are spelled correctly."
+ " Unknown parameters=[" +
options + "]."
+ " This producer cannot be started.";
LOG.
warn(
msg);
throw new
ConfigurationException(
msg);
}
}
this.
info.
setDestination(
destination);
// Enable producer window flow control if protocol > 3 and the window
// size > 0
if (
session.
connection.
getProtocolVersion() >= 3 && this.
info.
getWindowSize() > 0) {
producerWindow = new
MemoryUsage("Producer Window: " +
producerId);
producerWindow.
setExecutor(
session.
getConnectionExecutor());
producerWindow.
setLimit(this.
info.
getWindowSize());
producerWindow.
start();
}
this.
defaultDeliveryMode =
Message.
DEFAULT_DELIVERY_MODE;
this.
defaultPriority =
Message.
DEFAULT_PRIORITY;
this.
defaultTimeToLive =
Message.
DEFAULT_TIME_TO_LIVE;
this.
startTime =
System.
currentTimeMillis();
this.
messageSequence = new
AtomicLong(0);
this.
stats = new
JMSProducerStatsImpl(
session.
getSessionStats(),
destination);
try {
this.
session.
addProducer(this);
this.
session.
syncSendPacket(
info);
} catch (
JMSException e) {
this.
session.
removeProducer(this);
throw
e;
}
this.
setSendTimeout(
sendTimeout);
setTransformer(
session.
getTransformer());
}
@
Override
public
StatsImpl getStats() {
return
stats;
}
public
JMSProducerStatsImpl getProducerStats() {
return
stats;
}
/**
* Gets the destination associated with this <CODE>MessageProducer</CODE>.
*
* @return this producer's <CODE>Destination/ <CODE>
* @throws JMSException if the JMS provider fails to close the producer due to
* some internal error.
* @since 1.1
*/
@
Override
public
Destination getDestination() throws
JMSException {
checkClosed();
return this.
info.
getDestination();
}
/**
* Closes the message producer.
* <P>
* Since a provider may allocate some resources on behalf of a <CODE>
* MessageProducer</CODE>
* outside the Java virtual machine, clients should close them when they are
* not needed. Relying on garbage collection to eventually reclaim these
* resources may not be timely enough.
*
* @throws JMSException if the JMS provider fails to close the producer due
* to some internal error.
*/
@
Override
public void
close() throws
JMSException {
if (!
closed) {
dispose();
this.
session.
asyncSendPacket(
info.
createRemoveCommand());
}
}
@
Override
public void
dispose() {
if (!
closed) {
this.
session.
removeProducer(this);
if (
producerWindow != null) {
producerWindow.
stop();
}
closed = true;
}
}
/**
* Check if the instance of this producer has been closed.
*
* @throws IllegalStateException
*/
@
Override
protected void
checkClosed() throws
IllegalStateException {
if (
closed) {
throw new
IllegalStateException("The producer is closed");
}
}
/**
* Sends a message to a destination for an unidentified message producer,
* specifying delivery mode, priority and time to live.
* <P>
* Typically, a message producer is assigned a destination at creation time;
* however, the JMS API also supports unidentified message producers, which
* require that the destination be supplied every time a message is sent.
*
* @param destination the destination to send this message to
* @param message the message to send
* @param deliveryMode the delivery mode to use
* @param priority the priority for this message
* @param timeToLive the message's lifetime (in milliseconds)
* @throws JMSException if the JMS provider fails to send the message due to
* some internal error.
* @throws UnsupportedOperationException if an invalid destination is
* specified.
* @throws InvalidDestinationException if a client uses this method with an
* invalid destination.
* @see javax.jms.Session#createProducer
* @since 1.1
*/
@
Override
public void
send(
Destination destination,
Message message, int
deliveryMode, int
priority, long
timeToLive) throws
JMSException {
this.
send(
destination,
message,
deliveryMode,
priority,
timeToLive, null);
}
public void
send(
Message message,
AsyncCallback onComplete) throws
JMSException {
this.
send(this.
getDestination(),
message,
this.
defaultDeliveryMode,
this.
defaultPriority,
this.
defaultTimeToLive,
onComplete);
}
public void
send(
Destination destination,
Message message,
AsyncCallback onComplete) throws
JMSException {
this.
send(
destination,
message,
this.
defaultDeliveryMode,
this.
defaultPriority,
this.
defaultTimeToLive,
onComplete);
}
public void
send(
Message message, int
deliveryMode, int
priority, long
timeToLive,
AsyncCallback onComplete) throws
JMSException {
this.
send(this.
getDestination(),
message,
deliveryMode,
priority,
timeToLive,
onComplete);
}
public void
send(
Destination destination,
Message message, int
deliveryMode, int
priority, long
timeToLive,
AsyncCallback onComplete) throws
JMSException {
checkClosed();
if (
destination == null) {
if (
info.
getDestination() == null) {
throw new
UnsupportedOperationException("A destination must be specified.");
}
throw new
InvalidDestinationException("Don't understand null destinations");
}
ActiveMQDestination dest;
if (
destination.
equals(
info.
getDestination())) {
dest = (
ActiveMQDestination)
destination;
} else if (
info.
getDestination() == null) {
dest =
ActiveMQDestination.
transform(
destination);
} else {
throw new
UnsupportedOperationException("This producer can only send messages to: " + this.
info.
getDestination().
getPhysicalName());
}
if (
dest == null) {
throw new
JMSException("No destination specified");
}
if (
transformer != null) {
Message transformedMessage =
transformer.
producerTransform(
session, this,
message);
if (
transformedMessage != null) {
message =
transformedMessage;
}
}
if (
producerWindow != null) {
try {
producerWindow.
waitForSpace();
} catch (
InterruptedException e) {
throw new
JMSException("Send aborted due to thread interrupt.");
}
}
this.
session.
send(this,
dest,
message,
deliveryMode,
priority,
timeToLive,
producerWindow,
sendTimeout,
onComplete);
stats.
onMessage();
}
public
MessageTransformer getTransformer() {
return
transformer;
}
/**
* Sets the transformer used to transform messages before they are sent on
* to the JMS bus
*/
public void
setTransformer(
MessageTransformer transformer) {
this.
transformer =
transformer;
}
/**
* @return the time in milli second when this object was created.
*/
protected long
getStartTime() {
return this.
startTime;
}
/**
* @return Returns the messageSequence.
*/
protected long
getMessageSequence() {
return
messageSequence.
incrementAndGet();
}
/**
* @param messageSequence The messageSequence to set.
*/
protected void
setMessageSequence(
AtomicLong messageSequence) {
this.
messageSequence =
messageSequence;
}
/**
* @return Returns the info.
*/
protected
ProducerInfo getProducerInfo() {
return this.
info != null ? this.
info : null;
}
/**
* @param info The info to set
*/
protected void
setProducerInfo(
ProducerInfo info) {
this.
info =
info;
}
@
Override
public
String toString() {
return "ActiveMQMessageProducer { value=" +
info.
getProducerId() + " }";
}
public void
onProducerAck(
ProducerAck pa) {
if (this.
producerWindow != null) {
this.
producerWindow.
decreaseUsage(
pa.
getSize());
}
}
}