/*
* Copyright (c) 1999, 2013, Oracle and/or its affiliates. All rights reserved.
* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/
package javax.management.monitor;
import static com.sun.jmx.defaults.
JmxProperties.
MONITOR_LOGGER;
import com.sun.jmx.mbeanserver.
GetPropertyAction;
import com.sun.jmx.mbeanserver.
Introspector;
import java.io.
IOException;
import java.security.
AccessControlContext;
import java.security.
AccessController;
import java.security.
PrivilegedAction;
import java.security.
ProtectionDomain;
import java.util.
List;
import java.util.
Map;
import java.util.
WeakHashMap;
import java.util.concurrent.
CopyOnWriteArrayList;
import java.util.concurrent.
Executors;
import java.util.concurrent.
Future;
import java.util.concurrent.
LinkedBlockingQueue;
import java.util.concurrent.
ScheduledExecutorService;
import java.util.concurrent.
ScheduledFuture;
import java.util.concurrent.
ThreadFactory;
import java.util.concurrent.
ThreadPoolExecutor;
import java.util.concurrent.
TimeUnit;
import java.util.concurrent.atomic.
AtomicInteger;
import java.util.concurrent.atomic.
AtomicLong;
import java.util.logging.
Level;
import javax.management.
AttributeNotFoundException;
import javax.management.
InstanceNotFoundException;
import javax.management.
IntrospectionException;
import javax.management.
MBeanAttributeInfo;
import javax.management.
MBeanException;
import javax.management.
MBeanInfo;
import javax.management.
MBeanRegistration;
import javax.management.
MBeanServer;
import javax.management.
MBeanServerConnection;
import javax.management.
NotificationBroadcasterSupport;
import javax.management.
ObjectName;
import javax.management.
ReflectionException;
import static javax.management.monitor.
MonitorNotification.*;
/**
* Defines the part common to all monitor MBeans.
* A monitor MBean monitors values of an attribute common to a set of observed
* MBeans. The observed attribute is monitored at intervals specified by the
* granularity period. A gauge value (derived gauge) is derived from the values
* of the observed attribute.
*
*
* @since 1.5
*/
public abstract class
Monitor
extends
NotificationBroadcasterSupport
implements
MonitorMBean,
MBeanRegistration {
/*
* ------------------------------------------
* PACKAGE CLASSES
* ------------------------------------------
*/
static class
ObservedObject {
public
ObservedObject(
ObjectName observedObject) {
this.
observedObject =
observedObject;
}
public final
ObjectName getObservedObject() {
return
observedObject;
}
public final synchronized int
getAlreadyNotified() {
return
alreadyNotified;
}
public final synchronized void
setAlreadyNotified(int
alreadyNotified) {
this.
alreadyNotified =
alreadyNotified;
}
public final synchronized
Object getDerivedGauge() {
return
derivedGauge;
}
public final synchronized void
setDerivedGauge(
Object derivedGauge) {
this.
derivedGauge =
derivedGauge;
}
public final synchronized long
getDerivedGaugeTimeStamp() {
return
derivedGaugeTimeStamp;
}
public final synchronized void
setDerivedGaugeTimeStamp(
long
derivedGaugeTimeStamp) {
this.
derivedGaugeTimeStamp =
derivedGaugeTimeStamp;
}
private final
ObjectName observedObject;
private int
alreadyNotified;
private
Object derivedGauge;
private long
derivedGaugeTimeStamp;
}
/*
* ------------------------------------------
* PRIVATE VARIABLES
* ------------------------------------------
*/
/**
* Attribute to observe.
*/
private
String observedAttribute;
/**
* Monitor granularity period (in milliseconds).
* The default value is set to 10 seconds.
*/
private long
granularityPeriod = 10000;
/**
* Monitor state.
* The default value is set to <CODE>false</CODE>.
*/
private boolean
isActive = false;
/**
* Monitor sequence number.
* The default value is set to 0.
*/
private final
AtomicLong sequenceNumber = new
AtomicLong();
/**
* Complex type attribute flag.
* The default value is set to <CODE>false</CODE>.
*/
private boolean
isComplexTypeAttribute = false;
/**
* First attribute name extracted from complex type attribute name.
*/
private
String firstAttribute;
/**
* Remaining attribute names extracted from complex type attribute name.
*/
private final
List<
String>
remainingAttributes =
new
CopyOnWriteArrayList<
String>();
/**
* AccessControlContext of the Monitor.start() caller.
*/
private static final
AccessControlContext noPermissionsACC =
new
AccessControlContext(
new
ProtectionDomain[] {new
ProtectionDomain(null, null)});
private volatile
AccessControlContext acc =
noPermissionsACC;
/**
* Scheduler Service.
*/
private static final
ScheduledExecutorService scheduler =
Executors.
newSingleThreadScheduledExecutor(
new
DaemonThreadFactory("Scheduler"));
/**
* Map containing the thread pool executor per thread group.
*/
private static final
Map<
ThreadPoolExecutor,
Void>
executors =
new
WeakHashMap<
ThreadPoolExecutor,
Void>();
/**
* Lock for executors map.
*/
private static final
Object executorsLock = new
Object();
/**
* Maximum Pool Size
*/
private static final int
maximumPoolSize;
static {
final
String maximumPoolSizeSysProp = "jmx.x.monitor.maximum.pool.size";
final
String maximumPoolSizeStr =
AccessController.
doPrivileged(
new
GetPropertyAction(
maximumPoolSizeSysProp));
if (
maximumPoolSizeStr == null ||
maximumPoolSizeStr.
trim().
length() == 0) {
maximumPoolSize = 10;
} else {
int
maximumPoolSizeTmp = 10;
try {
maximumPoolSizeTmp =
Integer.
parseInt(
maximumPoolSizeStr);
} catch (
NumberFormatException e) {
if (
MONITOR_LOGGER.
isLoggable(
Level.
FINER)) {
MONITOR_LOGGER.
logp(
Level.
FINER,
Monitor.class.
getName(),
"<static initializer>",
"Wrong value for " +
maximumPoolSizeSysProp +
" system property",
e);
MONITOR_LOGGER.
logp(
Level.
FINER,
Monitor.class.
getName(),
"<static initializer>",
maximumPoolSizeSysProp + " defaults to 10");
}
maximumPoolSizeTmp = 10;
}
if (
maximumPoolSizeTmp < 1) {
maximumPoolSize = 1;
} else {
maximumPoolSize =
maximumPoolSizeTmp;
}
}
}
/**
* Future associated to the current monitor task.
*/
private
Future<?>
monitorFuture;
/**
* Scheduler task to be executed by the Scheduler Service.
*/
private final
SchedulerTask schedulerTask = new
SchedulerTask();
/**
* ScheduledFuture associated to the current scheduler task.
*/
private
ScheduledFuture<?>
schedulerFuture;
/*
* ------------------------------------------
* PROTECTED VARIABLES
* ------------------------------------------
*/
/**
* The amount by which the capacity of the monitor arrays are
* automatically incremented when their size becomes greater than
* their capacity.
*/
protected final static int
capacityIncrement = 16;
/**
* The number of valid components in the vector of observed objects.
*
*/
protected int
elementCount = 0;
/**
* Monitor errors that have already been notified.
* @deprecated equivalent to {@link #alreadyNotifieds}[0].
*/
@
Deprecated
protected int
alreadyNotified = 0;
/**
* <p>Selected monitor errors that have already been notified.</p>
*
* <p>Each element in this array corresponds to an observed object
* in the vector. It contains a bit mask of the flags {@link
* #OBSERVED_OBJECT_ERROR_NOTIFIED} etc, indicating whether the
* corresponding notification has already been sent for the MBean
* being monitored.</p>
*
*/
protected int
alreadyNotifieds[] = new int[
capacityIncrement];
/**
* Reference to the MBean server. This reference is null when the
* monitor MBean is not registered in an MBean server. This
* reference is initialized before the monitor MBean is registered
* in the MBean server.
* @see #preRegister(MBeanServer server, ObjectName name)
*/
protected
MBeanServer server;
// Flags defining possible monitor errors.
//
/**
* This flag is used to reset the {@link #alreadyNotifieds
* alreadyNotifieds} monitor attribute.
*/
protected static final int
RESET_FLAGS_ALREADY_NOTIFIED = 0;
/**
* Flag denoting that a notification has occurred after changing
* the observed object. This flag is used to check that the new
* observed object is registered in the MBean server at the time
* of the first notification.
*/
protected static final int
OBSERVED_OBJECT_ERROR_NOTIFIED = 1;
/**
* Flag denoting that a notification has occurred after changing
* the observed attribute. This flag is used to check that the
* new observed attribute belongs to the observed object at the
* time of the first notification.
*/
protected static final int
OBSERVED_ATTRIBUTE_ERROR_NOTIFIED = 2;
/**
* Flag denoting that a notification has occurred after changing
* the observed object or the observed attribute. This flag is
* used to check that the observed attribute type is correct
* (depending on the monitor in use) at the time of the first
* notification.
*/
protected static final int
OBSERVED_ATTRIBUTE_TYPE_ERROR_NOTIFIED = 4;
/**
* Flag denoting that a notification has occurred after changing
* the observed object or the observed attribute. This flag is
* used to notify any exception (except the cases described above)
* when trying to get the value of the observed attribute at the
* time of the first notification.
*/
protected static final int
RUNTIME_ERROR_NOTIFIED = 8;
/**
* This field is retained for compatibility but should not be referenced.
*
* @deprecated No replacement.
*/
@
Deprecated
protected
String dbgTag =
Monitor.class.
getName();
/*
* ------------------------------------------
* PACKAGE VARIABLES
* ------------------------------------------
*/
/**
* List of ObservedObjects to which the attribute to observe belongs.
*/
final
List<
ObservedObject>
observedObjects =
new
CopyOnWriteArrayList<
ObservedObject>();
/**
* Flag denoting that a notification has occurred after changing
* the threshold. This flag is used to notify any exception
* related to invalid thresholds settings.
*/
static final int
THRESHOLD_ERROR_NOTIFIED = 16;
/**
* Enumeration used to keep trace of the derived gauge type
* in counter and gauge monitors.
*/
enum
NumericalType { BYTE, SHORT, INTEGER, LONG, FLOAT, DOUBLE };
/**
* Constant used to initialize all the numeric values.
*/
static final
Integer INTEGER_ZERO = 0;
/*
* ------------------------------------------
* PUBLIC METHODS
* ------------------------------------------
*/
/**
* Allows the monitor MBean to perform any operations it needs
* before being registered in the MBean server.
* <P>
* Initializes the reference to the MBean server.
*
* @param server The MBean server in which the monitor MBean will
* be registered.
* @param name The object name of the monitor MBean.
*
* @return The name of the monitor MBean registered.
*
* @exception Exception
*/
public
ObjectName preRegister(
MBeanServer server,
ObjectName name)
throws
Exception {
MONITOR_LOGGER.
logp(
Level.
FINER,
Monitor.class.
getName(),
"preRegister(MBeanServer, ObjectName)",
"initialize the reference on the MBean server");
this.
server =
server;
return
name;
}
/**
* Allows the monitor MBean to perform any operations needed after
* having been registered in the MBean server or after the
* registration has failed.
* <P>
* Not used in this context.
*/
public void
postRegister(
Boolean registrationDone) {
}
/**
* Allows the monitor MBean to perform any operations it needs
* before being unregistered by the MBean server.
* <P>
* Stops the monitor.
*
* @exception Exception
*/
public void
preDeregister() throws
Exception {
MONITOR_LOGGER.
logp(
Level.
FINER,
Monitor.class.
getName(),
"preDeregister()", "stop the monitor");
// Stop the Monitor.
//
stop();
}
/**
* Allows the monitor MBean to perform any operations needed after
* having been unregistered by the MBean server.
* <P>
* Not used in this context.
*/
public void
postDeregister() {
}
/**
* Starts the monitor.
*/
public abstract void
start();
/**
* Stops the monitor.
*/
public abstract void
stop();
// GETTERS AND SETTERS
//--------------------
/**
* Returns the object name of the first object in the set of observed
* MBeans, or <code>null</code> if there is no such object.
*
* @return The object being observed.
*
* @see #setObservedObject(ObjectName)
*
* @deprecated As of JMX 1.2, replaced by {@link #getObservedObjects}
*/
@
Deprecated
public synchronized
ObjectName getObservedObject() {
if (
observedObjects.
isEmpty()) {
return null;
} else {
return
observedObjects.
get(0).
getObservedObject();
}
}
/**
* Removes all objects from the set of observed objects, and then adds the
* specified object.
*
* @param object The object to observe.
* @exception IllegalArgumentException The specified
* object is null.
*
* @see #getObservedObject()
*
* @deprecated As of JMX 1.2, replaced by {@link #addObservedObject}
*/
@
Deprecated
public synchronized void
setObservedObject(
ObjectName object)
throws
IllegalArgumentException {
if (
object == null)
throw new
IllegalArgumentException("Null observed object");
if (
observedObjects.
size() == 1 &&
containsObservedObject(
object))
return;
observedObjects.
clear();
addObservedObject(
object);
}
/**
* Adds the specified object in the set of observed MBeans, if this object
* is not already present.
*
* @param object The object to observe.
* @exception IllegalArgumentException The specified object is null.
*
*/
public synchronized void
addObservedObject(
ObjectName object)
throws
IllegalArgumentException {
if (
object == null) {
throw new
IllegalArgumentException("Null observed object");
}
// Check that the specified object is not already contained.
//
if (
containsObservedObject(
object))
return;
// Add the specified object in the list.
//
ObservedObject o =
createObservedObject(
object);
o.
setAlreadyNotified(
RESET_FLAGS_ALREADY_NOTIFIED);
o.
setDerivedGauge(
INTEGER_ZERO);
o.
setDerivedGaugeTimeStamp(
System.
currentTimeMillis());
observedObjects.
add(
o);
// Update legacy protected stuff.
//
createAlreadyNotified();
}
/**
* Removes the specified object from the set of observed MBeans.
*
* @param object The object to remove.
*
*/
public synchronized void
removeObservedObject(
ObjectName object) {
// Check for null object.
//
if (
object == null)
return;
final
ObservedObject o =
getObservedObject(
object);
if (
o != null) {
// Remove the specified object from the list.
//
observedObjects.
remove(
o);
// Update legacy protected stuff.
//
createAlreadyNotified();
}
}
/**
* Tests whether the specified object is in the set of observed MBeans.
*
* @param object The object to check.
* @return <CODE>true</CODE> if the specified object is present,
* <CODE>false</CODE> otherwise.
*
*/
public synchronized boolean
containsObservedObject(
ObjectName object) {
return
getObservedObject(
object) != null;
}
/**
* Returns an array containing the objects being observed.
*
* @return The objects being observed.
*
*/
public synchronized
ObjectName[]
getObservedObjects() {
ObjectName[]
names = new
ObjectName[
observedObjects.
size()];
for (int
i = 0;
i <
names.length;
i++)
names[
i] =
observedObjects.
get(
i).
getObservedObject();
return
names;
}
/**
* Gets the attribute being observed.
* <BR>The observed attribute is not initialized by default (set to null).
*
* @return The attribute being observed.
*
* @see #setObservedAttribute
*/
public synchronized
String getObservedAttribute() {
return
observedAttribute;
}
/**
* Sets the attribute to observe.
* <BR>The observed attribute is not initialized by default (set to null).
*
* @param attribute The attribute to observe.
* @exception IllegalArgumentException The specified
* attribute is null.
*
* @see #getObservedAttribute
*/
public void
setObservedAttribute(
String attribute)
throws
IllegalArgumentException {
if (
attribute == null) {
throw new
IllegalArgumentException("Null observed attribute");
}
// Update alreadyNotified array.
//
synchronized (this) {
if (
observedAttribute != null &&
observedAttribute.
equals(
attribute))
return;
observedAttribute =
attribute;
// Reset the complex type attribute information
// such that it is recalculated again.
//
cleanupIsComplexTypeAttribute();
int
index = 0;
for (
ObservedObject o :
observedObjects) {
resetAlreadyNotified(
o,
index++,
OBSERVED_ATTRIBUTE_ERROR_NOTIFIED |
OBSERVED_ATTRIBUTE_TYPE_ERROR_NOTIFIED);
}
}
}
/**
* Gets the granularity period (in milliseconds).
* <BR>The default value of the granularity period is 10 seconds.
*
* @return The granularity period value.
*
* @see #setGranularityPeriod
*/
public synchronized long
getGranularityPeriod() {
return
granularityPeriod;
}
/**
* Sets the granularity period (in milliseconds).
* <BR>The default value of the granularity period is 10 seconds.
*
* @param period The granularity period value.
* @exception IllegalArgumentException The granularity
* period is less than or equal to zero.
*
* @see #getGranularityPeriod
*/
public synchronized void
setGranularityPeriod(long
period)
throws
IllegalArgumentException {
if (
period <= 0) {
throw new
IllegalArgumentException("Nonpositive granularity " +
"period");
}
if (
granularityPeriod ==
period)
return;
granularityPeriod =
period;
// Reschedule the scheduler task if the monitor is active.
//
if (
isActive()) {
cleanupFutures();
schedulerFuture =
scheduler.
schedule(
schedulerTask,
period,
TimeUnit.
MILLISECONDS);
}
}
/**
* Tests whether the monitor MBean is active. A monitor MBean is
* marked active when the {@link #start start} method is called.
* It becomes inactive when the {@link #stop stop} method is
* called.
*
* @return <CODE>true</CODE> if the monitor MBean is active,
* <CODE>false</CODE> otherwise.
*/
/* This method must be synchronized so that the monitoring thread will
correctly see modifications to the isActive variable. See the MonitorTask
action executed by the Scheduled Executor Service. */
public synchronized boolean
isActive() {
return
isActive;
}
/*
* ------------------------------------------
* PACKAGE METHODS
* ------------------------------------------
*/
/**
* Starts the monitor.
*/
void
doStart() {
MONITOR_LOGGER.
logp(
Level.
FINER,
Monitor.class.
getName(),
"doStart()", "start the monitor");
synchronized (this) {
if (
isActive()) {
MONITOR_LOGGER.
logp(
Level.
FINER,
Monitor.class.
getName(),
"doStart()", "the monitor is already active");
return;
}
isActive = true;
// Reset the complex type attribute information
// such that it is recalculated again.
//
cleanupIsComplexTypeAttribute();
// Cache the AccessControlContext of the Monitor.start() caller.
// The monitor tasks will be executed within this context.
//
acc =
AccessController.
getContext();
// Start the scheduler.
//
cleanupFutures();
schedulerTask.
setMonitorTask(new
MonitorTask());
schedulerFuture =
scheduler.
schedule(
schedulerTask,
getGranularityPeriod(),
TimeUnit.
MILLISECONDS);
}
}
/**
* Stops the monitor.
*/
void
doStop() {
MONITOR_LOGGER.
logp(
Level.
FINER,
Monitor.class.
getName(),
"doStop()", "stop the monitor");
synchronized (this) {
if (!
isActive()) {
MONITOR_LOGGER.
logp(
Level.
FINER,
Monitor.class.
getName(),
"doStop()", "the monitor is not active");
return;
}
isActive = false;
// Cancel the scheduler task associated with the
// scheduler and its associated monitor task.
//
cleanupFutures();
// Reset the AccessControlContext.
//
acc =
noPermissionsACC;
// Reset the complex type attribute information
// such that it is recalculated again.
//
cleanupIsComplexTypeAttribute();
}
}
/**
* Gets the derived gauge of the specified object, if this object is
* contained in the set of observed MBeans, or <code>null</code> otherwise.
*
* @param object the name of the object whose derived gauge is to
* be returned.
*
* @return The derived gauge of the specified object.
*
* @since 1.6
*/
synchronized
Object getDerivedGauge(
ObjectName object) {
final
ObservedObject o =
getObservedObject(
object);
return
o == null ? null :
o.
getDerivedGauge();
}
/**
* Gets the derived gauge timestamp of the specified object, if
* this object is contained in the set of observed MBeans, or
* <code>0</code> otherwise.
*
* @param object the name of the object whose derived gauge
* timestamp is to be returned.
*
* @return The derived gauge timestamp of the specified object.
*
*/
synchronized long
getDerivedGaugeTimeStamp(
ObjectName object) {
final
ObservedObject o =
getObservedObject(
object);
return
o == null ? 0 :
o.
getDerivedGaugeTimeStamp();
}
Object getAttribute(
MBeanServerConnection mbsc,
ObjectName object,
String attribute)
throws
AttributeNotFoundException,
InstanceNotFoundException,
MBeanException,
ReflectionException,
IOException {
// Check for "ObservedAttribute" replacement.
// This could happen if a thread A called setObservedAttribute()
// while other thread B was in the middle of the monitor() method
// and received the old observed attribute value.
//
final boolean
lookupMBeanInfo;
synchronized (this) {
if (!
isActive())
throw new
IllegalArgumentException(
"The monitor has been stopped");
if (!
attribute.
equals(
getObservedAttribute()))
throw new
IllegalArgumentException(
"The observed attribute has been changed");
lookupMBeanInfo =
(
firstAttribute == null &&
attribute.
indexOf('.') != -1);
}
// Look up MBeanInfo if needed
//
final
MBeanInfo mbi;
if (
lookupMBeanInfo) {
try {
mbi =
mbsc.
getMBeanInfo(
object);
} catch (
IntrospectionException e) {
throw new
IllegalArgumentException(
e);
}
} else {
mbi = null;
}
// Check for complex type attribute
//
final
String fa;
synchronized (this) {
if (!
isActive())
throw new
IllegalArgumentException(
"The monitor has been stopped");
if (!
attribute.
equals(
getObservedAttribute()))
throw new
IllegalArgumentException(
"The observed attribute has been changed");
if (
firstAttribute == null) {
if (
attribute.
indexOf('.') != -1) {
MBeanAttributeInfo mbaiArray[] =
mbi.
getAttributes();
for (
MBeanAttributeInfo mbai :
mbaiArray) {
if (
attribute.
equals(
mbai.
getName())) {
firstAttribute =
attribute;
break;
}
}
if (
firstAttribute == null) {
String tokens[] =
attribute.
split("\\.", -1);
firstAttribute =
tokens[0];
for (int
i = 1;
i <
tokens.length;
i++)
remainingAttributes.
add(
tokens[
i]);
isComplexTypeAttribute = true;
}
} else {
firstAttribute =
attribute;
}
}
fa =
firstAttribute;
}
return
mbsc.
getAttribute(
object,
fa);
}
Comparable<?>
getComparableFromAttribute(
ObjectName object,
String attribute,
Object value)
throws
AttributeNotFoundException {
if (
isComplexTypeAttribute) {
Object v =
value;
for (
String attr :
remainingAttributes)
v =
Introspector.
elementFromComplex(
v,
attr);
return (
Comparable<?>)
v;
} else {
return (
Comparable<?>)
value;
}
}
boolean
isComparableTypeValid(
ObjectName object,
String attribute,
Comparable<?>
value) {
return true;
}
String buildErrorNotification(
ObjectName object,
String attribute,
Comparable<?>
value) {
return null;
}
void
onErrorNotification(
MonitorNotification notification) {
}
Comparable<?>
getDerivedGaugeFromComparable(
ObjectName object,
String attribute,
Comparable<?>
value) {
return (
Comparable<?>)
value;
}
MonitorNotification buildAlarmNotification(
ObjectName object,
String attribute,
Comparable<?>
value){
return null;
}
boolean
isThresholdTypeValid(
ObjectName object,
String attribute,
Comparable<?>
value) {
return true;
}
static
Class<? extends
Number>
classForType(
NumericalType type) {
switch (
type) {
case
BYTE:
return
Byte.class;
case
SHORT:
return
Short.class;
case
INTEGER:
return
Integer.class;
case
LONG:
return
Long.class;
case
FLOAT:
return
Float.class;
case
DOUBLE:
return
Double.class;
default:
throw new
IllegalArgumentException(
"Unsupported numerical type");
}
}
static boolean
isValidForType(
Object value,
Class<? extends
Number>
c) {
return ((
value ==
INTEGER_ZERO) ||
c.
isInstance(
value));
}
/**
* Get the specified {@code ObservedObject} if this object is
* contained in the set of observed MBeans, or {@code null}
* otherwise.
*
* @param object the name of the {@code ObservedObject} to retrieve.
*
* @return The {@code ObservedObject} associated to the supplied
* {@code ObjectName}.
*
* @since 1.6
*/
synchronized
ObservedObject getObservedObject(
ObjectName object) {
for (
ObservedObject o :
observedObjects)
if (
o.
getObservedObject().
equals(
object))
return
o;
return null;
}
/**
* Factory method for ObservedObject creation.
*
* @since 1.6
*/
ObservedObject createObservedObject(
ObjectName object) {
return new
ObservedObject(
object);
}
/**
* Create the {@link #alreadyNotified} array from
* the {@code ObservedObject} array list.
*/
synchronized void
createAlreadyNotified() {
// Update elementCount.
//
elementCount =
observedObjects.
size();
// Update arrays.
//
alreadyNotifieds = new int[
elementCount];
for (int
i = 0;
i <
elementCount;
i++) {
alreadyNotifieds[
i] =
observedObjects.
get(
i).
getAlreadyNotified();
}
updateDeprecatedAlreadyNotified();
}
/**
* Update the deprecated {@link #alreadyNotified} field.
*/
synchronized void
updateDeprecatedAlreadyNotified() {
if (
elementCount > 0)
alreadyNotified =
alreadyNotifieds[0];
else
alreadyNotified = 0;
}
/**
* Update the {@link #alreadyNotifieds} array element at the given index
* with the already notified flag in the given {@code ObservedObject}.
* Ensure the deprecated {@link #alreadyNotified} field is updated
* if appropriate.
*/
synchronized void
updateAlreadyNotified(
ObservedObject o, int
index) {
alreadyNotifieds[
index] =
o.
getAlreadyNotified();
if (
index == 0)
updateDeprecatedAlreadyNotified();
}
/**
* Check if the given bits in the given element of {@link #alreadyNotifieds}
* are set.
*/
synchronized boolean
isAlreadyNotified(
ObservedObject o, int
mask) {
return ((
o.
getAlreadyNotified() &
mask) != 0);
}
/**
* Set the given bits in the given element of {@link #alreadyNotifieds}.
* Ensure the deprecated {@link #alreadyNotified} field is updated
* if appropriate.
*/
synchronized void
setAlreadyNotified(
ObservedObject o, int
index,
int
mask, int
an[]) {
final int
i =
computeAlreadyNotifiedIndex(
o,
index,
an);
if (
i == -1)
return;
o.
setAlreadyNotified(
o.
getAlreadyNotified() |
mask);
updateAlreadyNotified(
o,
i);
}
/**
* Reset the given bits in the given element of {@link #alreadyNotifieds}.
* Ensure the deprecated {@link #alreadyNotified} field is updated
* if appropriate.
*/
synchronized void
resetAlreadyNotified(
ObservedObject o,
int
index, int
mask) {
o.
setAlreadyNotified(
o.
getAlreadyNotified() & ~
mask);
updateAlreadyNotified(
o,
index);
}
/**
* Reset all bits in the given element of {@link #alreadyNotifieds}.
* Ensure the deprecated {@link #alreadyNotified} field is updated
* if appropriate.
*/
synchronized void
resetAllAlreadyNotified(
ObservedObject o,
int
index, int
an[]) {
final int
i =
computeAlreadyNotifiedIndex(
o,
index,
an);
if (
i == -1)
return;
o.
setAlreadyNotified(
RESET_FLAGS_ALREADY_NOTIFIED);
updateAlreadyNotified(
o,
index);
}
/**
* Check if the {@link #alreadyNotifieds} array has been modified.
* If true recompute the index for the given observed object.
*/
synchronized int
computeAlreadyNotifiedIndex(
ObservedObject o,
int
index, int
an[]) {
if (
an ==
alreadyNotifieds) {
return
index;
} else {
return
observedObjects.
indexOf(
o);
}
}
/*
* ------------------------------------------
* PRIVATE METHODS
* ------------------------------------------
*/
/**
* This method is used by the monitor MBean to create and send a
* monitor notification to all the listeners registered for this
* kind of notification.
*
* @param type The notification type.
* @param timeStamp The notification emission date.
* @param msg The notification message.
* @param derGauge The derived gauge.
* @param trigger The threshold/string (depending on the monitor
* type) that triggered off the notification.
* @param object The ObjectName of the observed object that triggered
* off the notification.
* @param onError Flag indicating if this monitor notification is
* an error notification or an alarm notification.
*/
private void
sendNotification(
String type, long
timeStamp,
String msg,
Object derGauge,
Object trigger,
ObjectName object, boolean
onError) {
if (!
isActive())
return;
if (
MONITOR_LOGGER.
isLoggable(
Level.
FINER)) {
MONITOR_LOGGER.
logp(
Level.
FINER,
Monitor.class.
getName(),
"sendNotification", "send notification: " +
"\n\tNotification observed object = " +
object +
"\n\tNotification observed attribute = " +
observedAttribute +
"\n\tNotification derived gauge = " +
derGauge);
}
long
seqno =
sequenceNumber.
getAndIncrement();
MonitorNotification mn =
new
MonitorNotification(
type,
this,
seqno,
timeStamp,
msg,
object,
observedAttribute,
derGauge,
trigger);
if (
onError)
onErrorNotification(
mn);
sendNotification(
mn);
}
/**
* This method is called by the monitor each time
* the granularity period has been exceeded.
* @param o The observed object.
*/
private void
monitor(
ObservedObject o, int
index, int
an[]) {
String attribute;
String notifType = null;
String msg = null;
Object derGauge = null;
Object trigger = null;
ObjectName object;
Comparable<?>
value = null;
MonitorNotification alarm = null;
if (!
isActive())
return;
// Check that neither the observed object nor the
// observed attribute are null. If the observed
// object or observed attribute is null, this means
// that the monitor started before a complete
// initialization and nothing is done.
//
synchronized (this) {
object =
o.
getObservedObject();
attribute =
getObservedAttribute();
if (
object == null ||
attribute == null) {
return;
}
}
// Check that the observed object is registered in the
// MBean server and that the observed attribute
// belongs to the observed object.
//
Object attributeValue = null;
try {
attributeValue =
getAttribute(
server,
object,
attribute);
if (
attributeValue == null)
if (
isAlreadyNotified(
o,
OBSERVED_ATTRIBUTE_TYPE_ERROR_NOTIFIED))
return;
else {
notifType =
OBSERVED_ATTRIBUTE_TYPE_ERROR;
setAlreadyNotified(
o,
index,
OBSERVED_ATTRIBUTE_TYPE_ERROR_NOTIFIED,
an);
msg = "The observed attribute value is null.";
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(),
"monitor",
msg);
}
} catch (
NullPointerException np_ex) {
if (
isAlreadyNotified(
o,
RUNTIME_ERROR_NOTIFIED))
return;
else {
notifType =
RUNTIME_ERROR;
setAlreadyNotified(
o,
index,
RUNTIME_ERROR_NOTIFIED,
an);
msg =
"The monitor must be registered in the MBean " +
"server or an MBeanServerConnection must be " +
"explicitly supplied.";
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(),
"monitor",
msg);
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(),
"monitor",
np_ex.
toString());
}
} catch (
InstanceNotFoundException inf_ex) {
if (
isAlreadyNotified(
o,
OBSERVED_OBJECT_ERROR_NOTIFIED))
return;
else {
notifType =
OBSERVED_OBJECT_ERROR;
setAlreadyNotified(
o,
index,
OBSERVED_OBJECT_ERROR_NOTIFIED,
an);
msg =
"The observed object must be accessible in " +
"the MBeanServerConnection.";
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(),
"monitor",
msg);
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(),
"monitor",
inf_ex.
toString());
}
} catch (
AttributeNotFoundException anf_ex) {
if (
isAlreadyNotified(
o,
OBSERVED_ATTRIBUTE_ERROR_NOTIFIED))
return;
else {
notifType =
OBSERVED_ATTRIBUTE_ERROR;
setAlreadyNotified(
o,
index,
OBSERVED_ATTRIBUTE_ERROR_NOTIFIED,
an);
msg =
"The observed attribute must be accessible in " +
"the observed object.";
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(),
"monitor",
msg);
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(),
"monitor",
anf_ex.
toString());
}
} catch (
MBeanException mb_ex) {
if (
isAlreadyNotified(
o,
RUNTIME_ERROR_NOTIFIED))
return;
else {
notifType =
RUNTIME_ERROR;
setAlreadyNotified(
o,
index,
RUNTIME_ERROR_NOTIFIED,
an);
msg =
mb_ex.
getMessage() == null ? "" :
mb_ex.
getMessage();
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(),
"monitor",
msg);
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(),
"monitor",
mb_ex.
toString());
}
} catch (
ReflectionException ref_ex) {
if (
isAlreadyNotified(
o,
RUNTIME_ERROR_NOTIFIED)) {
return;
} else {
notifType =
RUNTIME_ERROR;
setAlreadyNotified(
o,
index,
RUNTIME_ERROR_NOTIFIED,
an);
msg =
ref_ex.
getMessage() == null ? "" :
ref_ex.
getMessage();
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(),
"monitor",
msg);
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(),
"monitor",
ref_ex.
toString());
}
} catch (
IOException io_ex) {
if (
isAlreadyNotified(
o,
RUNTIME_ERROR_NOTIFIED))
return;
else {
notifType =
RUNTIME_ERROR;
setAlreadyNotified(
o,
index,
RUNTIME_ERROR_NOTIFIED,
an);
msg =
io_ex.
getMessage() == null ? "" :
io_ex.
getMessage();
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(),
"monitor",
msg);
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(),
"monitor",
io_ex.
toString());
}
} catch (
RuntimeException rt_ex) {
if (
isAlreadyNotified(
o,
RUNTIME_ERROR_NOTIFIED))
return;
else {
notifType =
RUNTIME_ERROR;
setAlreadyNotified(
o,
index,
RUNTIME_ERROR_NOTIFIED,
an);
msg =
rt_ex.
getMessage() == null ? "" :
rt_ex.
getMessage();
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(),
"monitor",
msg);
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(),
"monitor",
rt_ex.
toString());
}
}
synchronized (this) {
// Check if the monitor has been stopped.
//
if (!
isActive())
return;
// Check if the observed attribute has been changed.
//
// Avoid race condition where mbs.getAttribute() succeeded but
// another thread replaced the observed attribute meanwhile.
//
// Avoid setting computed derived gauge on erroneous attribute.
//
if (!
attribute.
equals(
getObservedAttribute()))
return;
// Derive a Comparable object from the ObservedAttribute value
// if the type of the ObservedAttribute value is a complex type.
//
if (
msg == null) {
try {
value =
getComparableFromAttribute(
object,
attribute,
attributeValue);
} catch (
ClassCastException e) {
if (
isAlreadyNotified(
o,
OBSERVED_ATTRIBUTE_TYPE_ERROR_NOTIFIED))
return;
else {
notifType =
OBSERVED_ATTRIBUTE_TYPE_ERROR;
setAlreadyNotified(
o,
index,
OBSERVED_ATTRIBUTE_TYPE_ERROR_NOTIFIED,
an);
msg =
"The observed attribute value does not " +
"implement the Comparable interface.";
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(), "monitor",
msg);
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(), "monitor",
e.
toString());
}
} catch (
AttributeNotFoundException e) {
if (
isAlreadyNotified(
o,
OBSERVED_ATTRIBUTE_ERROR_NOTIFIED))
return;
else {
notifType =
OBSERVED_ATTRIBUTE_ERROR;
setAlreadyNotified(
o,
index,
OBSERVED_ATTRIBUTE_ERROR_NOTIFIED,
an);
msg =
"The observed attribute must be accessible in " +
"the observed object.";
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(), "monitor",
msg);
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(), "monitor",
e.
toString());
}
} catch (
RuntimeException e) {
if (
isAlreadyNotified(
o,
RUNTIME_ERROR_NOTIFIED))
return;
else {
notifType =
RUNTIME_ERROR;
setAlreadyNotified(
o,
index,
RUNTIME_ERROR_NOTIFIED,
an);
msg =
e.
getMessage() == null ? "" :
e.
getMessage();
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(), "monitor",
msg);
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(), "monitor",
e.
toString());
}
}
}
// Check that the observed attribute type is supported by this
// monitor.
//
if (
msg == null) {
if (!
isComparableTypeValid(
object,
attribute,
value)) {
if (
isAlreadyNotified(
o,
OBSERVED_ATTRIBUTE_TYPE_ERROR_NOTIFIED))
return;
else {
notifType =
OBSERVED_ATTRIBUTE_TYPE_ERROR;
setAlreadyNotified(
o,
index,
OBSERVED_ATTRIBUTE_TYPE_ERROR_NOTIFIED,
an);
msg = "The observed attribute type is not valid.";
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(), "monitor",
msg);
}
}
}
// Check that threshold type is supported by this monitor.
//
if (
msg == null) {
if (!
isThresholdTypeValid(
object,
attribute,
value)) {
if (
isAlreadyNotified(
o,
THRESHOLD_ERROR_NOTIFIED))
return;
else {
notifType =
THRESHOLD_ERROR;
setAlreadyNotified(
o,
index,
THRESHOLD_ERROR_NOTIFIED,
an);
msg = "The threshold type is not valid.";
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(), "monitor",
msg);
}
}
}
// Let someone subclassing the monitor to perform additional
// monitor consistency checks and report errors if necessary.
//
if (
msg == null) {
msg =
buildErrorNotification(
object,
attribute,
value);
if (
msg != null) {
if (
isAlreadyNotified(
o,
RUNTIME_ERROR_NOTIFIED))
return;
else {
notifType =
RUNTIME_ERROR;
setAlreadyNotified(
o,
index,
RUNTIME_ERROR_NOTIFIED,
an);
MONITOR_LOGGER.
logp(
Level.
FINEST,
Monitor.class.
getName(), "monitor",
msg);
}
}
}
// If no errors were found then clear all error flags and
// let the monitor decide if a notification must be sent.
//
if (
msg == null) {
// Clear all already notified flags.
//
resetAllAlreadyNotified(
o,
index,
an);
// Get derived gauge from comparable value.
//
derGauge =
getDerivedGaugeFromComparable(
object,
attribute,
value);
o.
setDerivedGauge(
derGauge);
o.
setDerivedGaugeTimeStamp(
System.
currentTimeMillis());
// Check if an alarm must be fired.
//
alarm =
buildAlarmNotification(
object,
attribute,
(
Comparable<?>)
derGauge);
}
}
// Notify monitor errors
//
if (
msg != null)
sendNotification(
notifType,
System.
currentTimeMillis(),
msg,
derGauge,
trigger,
object,
true);
// Notify monitor alarms
//
if (
alarm != null &&
alarm.
getType() != null)
sendNotification(
alarm.
getType(),
System.
currentTimeMillis(),
alarm.
getMessage(),
derGauge,
alarm.
getTrigger(),
object,
false);
}
/**
* Cleanup the scheduler and monitor tasks futures.
*/
private synchronized void
cleanupFutures() {
if (
schedulerFuture != null) {
schedulerFuture.
cancel(false);
schedulerFuture = null;
}
if (
monitorFuture != null) {
monitorFuture.
cancel(false);
monitorFuture = null;
}
}
/**
* Cleanup the "is complex type attribute" info.
*/
private synchronized void
cleanupIsComplexTypeAttribute() {
firstAttribute = null;
remainingAttributes.
clear();
isComplexTypeAttribute = false;
}
/**
* SchedulerTask nested class: This class implements the Runnable interface.
*
* The SchedulerTask is executed periodically with a given fixed delay by
* the Scheduled Executor Service.
*/
private class
SchedulerTask implements
Runnable {
private
MonitorTask task;
/*
* ------------------------------------------
* CONSTRUCTORS
* ------------------------------------------
*/
public
SchedulerTask() {
}
/*
* ------------------------------------------
* GETTERS/SETTERS
* ------------------------------------------
*/
public void
setMonitorTask(
MonitorTask task) {
this.
task =
task;
}
/*
* ------------------------------------------
* PUBLIC METHODS
* ------------------------------------------
*/
public void
run() {
synchronized (
Monitor.this) {
Monitor.this.
monitorFuture =
task.
submit();
}
}
}
/**
* MonitorTask nested class: This class implements the Runnable interface.
*
* The MonitorTask is executed periodically with a given fixed delay by the
* Scheduled Executor Service.
*/
private class
MonitorTask implements
Runnable {
private
ThreadPoolExecutor executor;
/*
* ------------------------------------------
* CONSTRUCTORS
* ------------------------------------------
*/
public
MonitorTask() {
// Find out if there's already an existing executor for the calling
// thread and reuse it. Otherwise, create a new one and store it in
// the executors map. If there is a SecurityManager, the group of
// System.getSecurityManager() is used, else the group of the thread
// instantiating this MonitorTask, i.e. the group of the thread that
// calls "Monitor.start()".
SecurityManager s =
System.
getSecurityManager();
ThreadGroup group = (
s != null) ?
s.
getThreadGroup() :
Thread.
currentThread().
getThreadGroup();
synchronized (
executorsLock) {
for (
ThreadPoolExecutor e :
executors.
keySet()) {
DaemonThreadFactory tf =
(
DaemonThreadFactory)
e.
getThreadFactory();
ThreadGroup tg =
tf.
getThreadGroup();
if (
tg ==
group) {
executor =
e;
break;
}
}
if (
executor == null) {
executor = new
ThreadPoolExecutor(
maximumPoolSize,
maximumPoolSize,
60L,
TimeUnit.
SECONDS,
new
LinkedBlockingQueue<
Runnable>(),
new
DaemonThreadFactory("ThreadGroup<" +
group.
getName() + "> Executor",
group));
executor.
allowCoreThreadTimeOut(true);
executors.
put(
executor, null);
}
}
}
/*
* ------------------------------------------
* PUBLIC METHODS
* ------------------------------------------
*/
public
Future<?>
submit() {
return
executor.
submit(this);
}
public void
run() {
final
ScheduledFuture<?>
sf;
final
AccessControlContext ac;
synchronized (
Monitor.this) {
sf =
Monitor.this.
schedulerFuture;
ac =
Monitor.this.
acc;
}
PrivilegedAction<
Void>
action = new
PrivilegedAction<
Void>() {
public
Void run() {
if (
Monitor.this.
isActive()) {
final int
an[] =
alreadyNotifieds;
int
index = 0;
for (
ObservedObject o :
Monitor.this.
observedObjects) {
if (
Monitor.this.
isActive()) {
Monitor.this.
monitor(
o,
index++,
an);
}
}
}
return null;
}
};
if (
ac == null) {
throw new
SecurityException("AccessControlContext cannot be null");
}
AccessController.
doPrivileged(
action,
ac);
synchronized (
Monitor.this) {
if (
Monitor.this.
isActive() &&
Monitor.this.
schedulerFuture ==
sf) {
Monitor.this.
monitorFuture = null;
Monitor.this.
schedulerFuture =
scheduler.
schedule(
Monitor.this.
schedulerTask,
Monitor.this.
getGranularityPeriod(),
TimeUnit.
MILLISECONDS);
}
}
}
}
/**
* Daemon thread factory used by the monitor executors.
* <P>
* This factory creates all new threads used by an Executor in
* the same ThreadGroup. If there is a SecurityManager, it uses
* the group of System.getSecurityManager(), else the group of
* the thread instantiating this DaemonThreadFactory. Each new
* thread is created as a daemon thread with priority
* Thread.NORM_PRIORITY. New threads have names accessible via
* Thread.getName() of "{@literal JMX Monitor <pool-name> Pool [Thread-M]}",
* where M is the sequence number of the thread created by this
* factory.
*/
private static class
DaemonThreadFactory implements
ThreadFactory {
final
ThreadGroup group;
final
AtomicInteger threadNumber = new
AtomicInteger(1);
final
String namePrefix;
static final
String nameSuffix = "]";
public
DaemonThreadFactory(
String poolName) {
SecurityManager s =
System.
getSecurityManager();
group = (
s != null) ?
s.
getThreadGroup() :
Thread.
currentThread().
getThreadGroup();
namePrefix = "JMX Monitor " +
poolName + " Pool [Thread-";
}
public
DaemonThreadFactory(
String poolName,
ThreadGroup threadGroup) {
group =
threadGroup;
namePrefix = "JMX Monitor " +
poolName + " Pool [Thread-";
}
public
ThreadGroup getThreadGroup() {
return
group;
}
public
Thread newThread(
Runnable r) {
Thread t = new
Thread(
group,
r,
namePrefix +
threadNumber.
getAndIncrement() +
nameSuffix,
0);
t.
setDaemon(true);
if (
t.
getPriority() !=
Thread.
NORM_PRIORITY)
t.
setPriority(
Thread.
NORM_PRIORITY);
return
t;
}
}
}