Thursday, 12 March 2015

How To Register MBean Which talks to Coherence using Web Application

I have used http://hilite.me/ website to do the java-html conversion.
We are using weblogic11g in which support for coherence3.7 is builtin.

Thanks To my collegue Paul Bently for coming up with the solution.


weblogic.xml

<?xml version = '1.0' encoding = 'windows-1252'?>
<weblogic-web-app
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.bea.com/ns/weblogic/weblogic-web-app
            http://www.bea.com/ns/weblogic/weblogic-web-app/1.0/weblogic-web-app.xsd"
        xmlns="http://www.bea.com/ns/weblogic/weblogic-web-app">
    <context-root>TestExample</context-root>
</weblogic-web-app>


web.xml

<?xml version="1.0" encoding="UTF-8"?>
<web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd">

  <listener>
   <listener-class>ExampleAdmin</listener-class>
 </listener>

 <context-param>
  <param-name>shutdown-on-unload</param-name>
  <param-value>true</param-value>
 </context-param>

 <context-param>
  <param-name>start-scheduler-on-load</param-name>
  <param-value>true</param-value>
 </context-param>

 <servlet>
  <servlet-name>ExampleAdmin</servlet-name>
  <servlet-class>ExampleAdmin</servlet-class>
 </servlet>

 <servlet-mapping>
  <servlet-name>ExampleAdmin</servlet-name>
  <url-pattern>/</url-pattern>
 </servlet-mapping>
</web-app>

CacheControllerMBean.java

/**
 * MBean interface for operations to control cache.
 */
public interface CacheControllerMBean {

 String getCacheNames();
 String getCacheData(final String name);
 String getValue(final String name, final String key);
 String refreshCache(final String name);
 String refreshCacheValue(final String name, final String key);
}


ExampleAdmin.java

public class ExampleAdmin extends HttpServlet implements ServletContextListener {


    private static final String TEXT_PLAIN = "text/plain";
    private static final String OBJECT_NAME =
            "co.example:type=co.example.ExampleAdminService,name=ExampleAdmin";
    private static final String JMX_RUNTIME_JNDI = "java:comp/env/jmx/runtime";

    private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationAdmin.class);

    @Override
    public void contextDestroyed(final ServletContextEvent servletContextEvent) {
        LOGGER.info("Context destroyed");
        try {
            final InitialContext ctx = new InitialContext();
            final MBeanServer server = (MBeanServer) ctx.lookup(JMX_RUNTIME_JNDI);
            final ObjectName name = new ObjectName(OBJECT_NAME);
            server.unregisterMBean(name);
        } catch (final Exception e) {
            LOGGER.error("Error unregistering ExampleAdminMXBean", e);
        }
    }

    @Override
    public void contextInitialized(final ServletContextEvent servletContextEvent) {
        LOGGER.info("Context initialised");
        try {
            final InitialContext ctx = new InitialContext();
            final MBeanServer server = (MBeanServer) ctx.lookup(JMX_RUNTIME_JNDI);
            final ObjectName name = new ObjectName(OBJECT_NAME);
            server.registerMBean(new CacheController(CacheControllerMBean.class), name);
        } catch (final Exception e) {
            LOGGER.error("Error registering ExampleAdminMXBean", e.getMessage());
        }
    }

    @Override
    public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws IOException {
        response.setContentType(TEXT_PLAIN);
        final PrintWriter writer = new PrintWriter(response.getOutputStream());
        try {
            writer.write("This servlet is just a vehicle for registering and deregietering the ConfigurationAdminMBean\n");
            writer.write("on deployment and undeployment of the appication.\n");
            writer.write("With EJB 3.1 / Fusion 12c the @Startup annotation can be used to provide registration callback\n");
            writer.write("and it will no longer be necessary to implement this as a servelt.\n");
            writer.write("Alternatively a proprietary Weblogic ApplicationLifecycleListener could have be implemented.\n");
        } finally {
            if (writer != null) {
                writer.flush();
                writer.close();
            }
        }
    }

    @Override
    public void doPost(final HttpServletRequest request, final HttpServletResponse response) throws IOException {
        doGet(request, response);
    }

}

MetaData.java

public final class Metadata {

    public static final String EXAMPLE_CONFIGURATION = "EXAMPLE_CONFIGURATION";

    public static final Map<String, String> MAP = new HashMap<String, String>();

    static {
        MAP.put(EXAMPLE_CONFIGURATION, "SELECT c.value FROM dummy c WHERE c.name = ?");
    }

    /**
     * This class cannot be constructed.
     */
    private Metadata() {
        throw new IllegalStateException("This class cannot be constructed");
    }

}


import javax.management.MBeanAttributeInfo;
import javax.management.MBeanInfo;
import javax.management.MBeanOperationInfo;
import javax.management.MBeanParameterInfo;
import javax.management.NotCompliantMBeanException;
import javax.management.StandardMBean;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.CacheService;
import com.tangosol.net.Cluster;
import com.tangosol.net.NamedCache;
import com.tangosol.net.Service;

/**
 * CacheControllerMBean to provide cache administration commands.
 */
public class CacheController extends StandardMBean implements CacheControllerMBean {

    private static final String CACHE_KEY = "Cache Key";
    private static final String CACHE_NAME = "Cache Name";
    private static final String CACHE_NAMES = "CacheNames";
    private static final String CACHE_REFRESHED = "Cache refreshed";
    private static final String GET_CACHE_DATA = "getCacheData";
    private static final String GETS_A_VALUE_FROM_THE_CACHE_FORCING_A_DATABASE_LOAD_IF_IT_IS_NOT_IN_THE_CACHE =
            "Gets a value from the cache forcing a database load if it is not in the cache";
    private static final String GETS_THE_ALL_DATA_CURRENTLY_LOADED_IN_A_CACHE = "Gets the all data currently loaded in a cache";
    private static final String GET_VALUE = "getValue";
    private static final String REFRESHES_A_SINGLE_CACHE_VALUE = "Refreshes a single cache value";
    private static final String REFRESHES_ALL_VALUES_CURRENTLY_LOADED_IN_A_CACHE =
            "Refreshes all values currently loaded in a cache";
    private static final String REFRESH_CACHE = "refreshCache";
    private static final String REFRESH_CACHE_VALUE = "refreshCacheValue";
    private static final String THE_NAMES_OF_ALL_AVAILABLE_CACHES = "The names of all available caches";
    private static final String UNABLE_TO_REFRESH_CACHE_REPOPULATION_SQL_UNAVAILABLE =
            "Unable to refresh cache repopulation SQL unavailable";

    private static final Logger LOGGER = LoggerFactory.getLogger(CacheController.class);

    private final Map<String, String> attributeDescriptionMap = new HashMap<String, String>();
    private final Map<String, String> operationDescriptionMap = new HashMap<String, String>();
    private final Map<String, String[]> operationParameterNameMap = new HashMap<String, String[]>();

    /**
     * Constructs the CacheController.
     * @param mbeanInterface the interface class.
     * @throws NotCompliantMBeanException if there is an error constructing the class.
     */
    public CacheController(final Class<?> mbeanInterface) throws NotCompliantMBeanException {
        super(mbeanInterface);

        attributeDescriptionMap.put(CACHE_NAMES, THE_NAMES_OF_ALL_AVAILABLE_CACHES);

        operationDescriptionMap.put(GET_CACHE_DATA, GETS_THE_ALL_DATA_CURRENTLY_LOADED_IN_A_CACHE);
        operationDescriptionMap.put(GET_VALUE, GETS_A_VALUE_FROM_THE_CACHE_FORCING_A_DATABASE_LOAD_IF_IT_IS_NOT_IN_THE_CACHE);
        operationDescriptionMap.put(REFRESH_CACHE, REFRESHES_ALL_VALUES_CURRENTLY_LOADED_IN_A_CACHE);
        operationDescriptionMap.put(REFRESH_CACHE_VALUE, REFRESHES_A_SINGLE_CACHE_VALUE);

        operationParameterNameMap.put(GET_CACHE_DATA, new String[]{CACHE_NAME});
        operationParameterNameMap.put(GET_VALUE, new String[]{CACHE_NAME, CACHE_KEY});
        operationParameterNameMap.put(REFRESH_CACHE, new String[]{CACHE_NAME});
        operationParameterNameMap.put(REFRESH_CACHE_VALUE, new String[]{CACHE_NAME, CACHE_KEY});
    }

    @Override
    public String getCacheData(final String name) {
        LOGGER.info("getData " + name);

        final NamedCache namedCache = CacheFactory.getCache(name);
        final List orderedKeys = new ArrayList<Object>();
        orderedKeys.addAll(namedCache.keySet());
        try {
            Collections.sort(orderedKeys);
        } catch (final Exception e) {
            // One of the objects we are trying to sort may not implement comparable
            // Just display them unsorted
            LOGGER.error("error sorting keys", e);
        }
        final StringBuilder result = new StringBuilder();
        for (final Object key : orderedKeys) {
            result.append(key + ":" + namedCache.get(key) + "\n");
        }

        return result.toString();
    }

    @Override
    public String getCacheNames() {
        LOGGER.info("getCacheNames");

        final StringBuilder result = new StringBuilder();
        final Cluster cluster = CacheFactory.ensureCluster();
        for (final Enumeration<String> services = cluster.getServiceNames(); services.hasMoreElements();) {
            final String serviceName = services.nextElement();
            final Service service = cluster.getService(serviceName);
            if (service instanceof CacheService) {
                final CacheService cacheService = (CacheService) service;
                for (final Enumeration<String> caches = cacheService.getCacheNames(); caches.hasMoreElements();) {
                    result.append(caches.nextElement() + " ");
                }
            }
        }
        return result.toString();
    }

    @Override
    public String getValue(final String name, final String key) {
        LOGGER.info("getValue " + name + " " + key);

        return key + ":" + CacheFactory.getCache(name).get(key);
    }

    @Override
    public String refreshCache(final String name) {
        LOGGER.info("refreshCacheValue " + name);

        final String sql = Metadata.MAP.get(name);
        if (sql != null) {
            final RefreshableDatabaseCache refreshableDatabaseCache =
                    new RefreshableDatabaseCache(CacheFactory.getCache(name), sql);
            try {
                refreshableDatabaseCache.refreshCache();
            } catch (final Exception e) {
                LOGGER.error("error refreshing cache " + name, e);
                return e.getMessage();
            }
        }
        return sql == null ? UNABLE_TO_REFRESH_CACHE_REPOPULATION_SQL_UNAVAILABLE : CACHE_REFRESHED;
    }

    @Override
    public String refreshCacheValue(final String name, final String key) {
        LOGGER.info("refreshCacheValue " + name + " " + key);

        final String sql = Metadata.MAP.get(name);
        final RefreshableDatabaseCache refreshableDatabaseCache = new RefreshableDatabaseCache(CacheFactory.getCache(name), sql);
        try {
            refreshableDatabaseCache.refreshCache(key);
        } catch (final Exception e) {
            LOGGER.error("error refreshing cache " + name, e);
            return e.getMessage();
        }
        return sql == null ? UNABLE_TO_REFRESH_CACHE_REPOPULATION_SQL_UNAVAILABLE : CACHE_REFRESHED;
    }

    @Override
    // Add a meaningful description for this MBean
    public String getDescription(final MBeanInfo mBeanInfo) {
        return "MBean for managing cached data";
    }

    @Override
    // Add a meaningful description for annotations
    public String getDescription(final MBeanAttributeInfo mBeanAttributeInfo) {
        final String description = attributeDescriptionMap.get(mBeanAttributeInfo.getName());
        return description == null ? super.getDescription(mBeanAttributeInfo) : description;
    }

    @Override
    // Add a meaningful description for operations
    public String getDescription(final MBeanOperationInfo mBeanOperationInfo) {
        final String operationName = mBeanOperationInfo.getName();
        return operationName == null ? super.getDescription(mBeanOperationInfo)
                : operationDescriptionMap.get(operationName);
    }

    @Override
    // Add a meaningful description for operation parameters
    public String getParameterName(final MBeanOperationInfo mBeanOperationInfo,
            final MBeanParameterInfo mBeanParameterInfo, final int sequence) {
        final String[] parameterNames = operationParameterNameMap.get(mBeanOperationInfo.getName());
        return parameterNames == null ? super.getParameterName(mBeanOperationInfo, mBeanParameterInfo, sequence)
                : parameterNames[sequence];
    }

}

DatabaseCache.java

import com.tangosol.net.NamedCache;

/**
 * Database table cache.
 * This class caches arbitrary SQL statements with each result set row stored in an ArrayList.
 */
public class DatabaseCache implements DataCache {

    private static final String JNDI_POOL = "jdbc/ExamplePool";
    private static final String DATABASE_CACHE = "DatabaseCache";
    private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseCache.class);
    private static final long LOCK_WIAT = 5000L;

    // The SQL to cache data for
    private final String sql;

    // The map to store the values in
    protected final Map cache;

    // The column names
    private Map<String, Integer> columnNames;

    /**
     * Constructs a DatabaseCache.
     * Rows are read into the cache on demand (lazy).
     * @param cacheImplementation the map it which to store the cache data.
     * @param sqlStatement the SQL to cache which must not require exactly one bind variables.
     */
    public DatabaseCache(final Map cacheImplementation, final String sqlStatement) {
        this.cache = cacheImplementation;
        this.sql = sqlStatement;
    }

    @Override
    public List<String> getColumnNames() {
        final List<String> defensiveCopy = new ArrayList<String>(columnNames.size());
        defensiveCopy.addAll(columnNames.keySet());
        return defensiveCopy;
    }

    @Override
    public Map<Object, List<Object>> getData() {
        final Map<Object, List<Object>> defensiveCopy = new HashMap<Object, List<Object>>();
        for (final Object key : cache.keySet()) {
            defensiveCopy.put(key, getRow(key));
        }
        return defensiveCopy;
    }

    @Override
    public List<Object> getRow(final Object key) {
        List<Object> row = (List<Object>) cache.get(key);

        if (row == null) {
            try {
                row = getRowFromDB(key, true);
                cache.put(key, row);
            } catch (final Exception e) {
                LOGGER.error("Unexpected Error", e);
            }
        }

        List<Object> defensiveCopy = null;
        if (row != null) {
            defensiveCopy = new ArrayList<Object>(row.size());
            for (final Object value: row) {

                // Take a deep copy of dates which are the only mutable JDBC type
                // Shallow copy immutable objects
                defensiveCopy.add(value instanceof Date ? new Date(((Date) value).getTime()) : value);
            }
        }
        return defensiveCopy;
    }

    @Override
    public Object getColumn(final Object key, final String column) {
        final List<Object> row = getRow(key);
        final Integer index = columnNames.get(column);
        return row != null && row.size() > index ? row.get(index) : null;
    }

    @Override
    public Object getColumn(final Object key, final int column) {
        final List<Object> row = getRow(key);
        return row != null && row.size() > column ? row.get(column) : null;
    }

    /**
     * Gets the row directly from the database bypassing the cache.
     * @param key the key to the query.
     * @param checkOnLockAcquisition check if the row has already been retrieved on lock acquisition.
     * @return the list of columns.
     * @throws Exception if there is an error getting the row.
     */
    protected List<Object> getRowFromDB(final Object key, final boolean checkOnLockAcquisition) throws Exception {
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        ResultSet resultSet = null;
        List<Object> list;
        try {

            LOGGER.debug("locking key = " + key);

            // Only one cluster node should populate one cache row value
            ((NamedCache) (this.cache)).lock(key, LOCK_WIAT);

            // Check to see if the row has been read while waiting for lock
            list = (List<Object>) cache.get(key);
            if (!checkOnLockAcquisition || list == null) {

                LOGGER.debug("reading row data key = " + key);

                // Query DB
                final Context context = new InitialContext();
                final DataSource dataSource = (DataSource) context.lookup(JNDI_POOL);
                connection = dataSource.getConnection();
                preparedStatement = connection.prepareStatement(sql);
                preparedStatement.setObject(1, key);
                resultSet = preparedStatement.executeQuery();
                final ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
                final int columnCount = resultSetMetaData.getColumnCount();

                // Read the column names only once
                if (columnNames == null) {
                    columnNames = new LinkedHashMap<String, Integer>(columnCount);
                    for (int i = 0; i < columnCount; i++) {
                        columnNames.put(resultSetMetaData.getColumnName(i + 1), i);
                    }
                }

                // Cache the data
                list = new ArrayList<Object>(columnCount);
                if (resultSet.next()) {
                    for (int i = 1; i <= columnCount; i++) {
                        list.add(resultSet.getObject(i));
                    }
                }
                LOGGER.debug("row data " + list);
            }
        } finally {
            ((NamedCache) (this.cache)).unlock(key);
            closeResultSet(resultSet);
            closeStatement(preparedStatement);
            closeConnection(connection);
        }
        return list;
    }

    /**
     * Safely closes a resultSet.
     * @param resultSet the statement to close.
     */
    private void closeResultSet(final ResultSet resultSet) {
        try {
            if (resultSet != null) {
                resultSet.close();
            }
        } catch (final Exception e) {
            LOGGER.error("Error closing resultSet", e);
        }
    }

    /**
     * Safely closes a statement.
     * @param statement the statement to close.
     */
    private void closeStatement(final Statement statement) {
        try {
            if (statement != null) {
                statement.close();
            }
        } catch (final Exception e) {
            LOGGER.error("Error closing statement", e);
        }
    }

    /**
     * Safely closes a connection.
     * @param connection the connection to close.
     */
    private void closeConnection(final Connection connection) {
        try {
            if (connection != null) {
                connection.close();
            }
        } catch (final Exception e) {
            LOGGER.error("Error closing connection", e);
        }
    }

}

RefreshableDatabaseCache.java

/**
 * DatabaseCache class extended to include refresh capability.
 */
public class RefreshableDatabaseCache extends DatabaseCache {

    private static final Logger LOGGER = LoggerFactory.getLogger(RefreshableDatabaseCache.class);

    /**
     * Constructs a RefreshableDatabaseCache.
     * Rows are read into the cache on demand (lazy).
     * @param cacheImplementation the map it which to store the cache data.
     * @param sqlStatement the SQL to cache which must not require exactly one bind variables.
     */
    public RefreshableDatabaseCache(final Map cacheImplementation, final String sqlStatement) {
        super(cacheImplementation, sqlStatement);
    }

    /**
     * Gets a key safely. Since the key passed here is likely to be a String from the MBean UI
     * but the key to the map may be something else (e.g. an Integer) then we have to compare stringified
     * version of the key from the table to get the required key.
     * @param key the key.
     * @param tableData the table data.
     * @return the safe key.
     * @throws Exception if there is an error getting the row.
     */
    private Object safeGetKey(final Object key, final Map<Object, List<Object>> tableData) throws Exception {
        Object realKey = null;
        if (key != null && tableData != null) {
            for (final Object object : tableData.keySet()) {
                if (object != null && key.toString().equals(object.toString())) {
                    realKey = object;
                    break;
                }
            }
        }
        return realKey;
    }

    /**
     * Refreshes all the cached rows from the table.
     * Before the rows are cleared the new values are retrieved from their source.
     * If there is an error getting any of the new value the old values are not cleared.
     * So either all the rows are cleared or none of the rows are cleared.
     * @throws Exception if there is an error refreshing the cache.
     */
    public void refreshCache() throws Exception {
        try {

            // Get the new rows
            final Map<Object, List<Object>> rows = new HashMap<Object, List<Object>>();
            final Set<Object> keys = cache.keySet();
            for (final Object key : keys) {
                final String safeKey = (String) safeGetKey(key, cache);
                final List<Object> row = getRowFromDB(safeKey, false);
                if (row == null) {
                    throw new IllegalStateException("Failed to get row from database key = " + key);
                }
                rows.put(key, row);
            }

            // Remove the old rows
            cache.clear();

            // Insert the new rows
            cache.putAll(rows);

        } catch (final Exception e) {
            LOGGER.error("Failed to clearRow cache unchanged", e);
            throw e;
        }
    }

    /**
     * Refreshes a single cached row from a table.
     * Before the row is cleared the new value is retrieved from its source.
     * If there is an error getting the new value the old value is not cleared.
     * @param key the key for the row to clear.
     * @throws Exception if there is an error refreshing the cache.
     */
    public void refreshCache(final Object key) throws Exception {

        try {
            // Get the new row
            final String safeKey = (String) safeGetKey(key, cache);
            final List<Object> row = getRowFromDB(safeKey, false);
            if (row == null) {
                throw new IllegalStateException("Failed to get row from database key = " + key);
            }

            // Remove the old rows
            cache.remove(key);

            // Insert the new row
            cache.put(key, row);
        } catch (final Exception e) {
            LOGGER.error("Failed to clearRow cache unchanged", e);
            throw e;
        }
    }

}
These are the setup files required inorder for coherence to work...

Make sure your server start script has these properties. These properties can be set via admin console (Environment/Servers/Admin server/server start
-Dtangosol.coherence.override=C:\oracle\coherence_3.7\bin\tangosol-coherence-override.xml -Dtangosol.coherence.cacheconfig=C:\oracle\Middleware\coherence_3.7\bin\cache-config.xml


tangosol-coherence-override.xml 

<?xml version='1.0'?>

<coherence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns="http://xmlns.oracle.com/coherence/coherence-operational-config"
 xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd">
 <cluster-config>
  <member-identity>
   <cluster-name>examplecluster</cluster-name>
  </member-identity>

  <unicast-listener>
   <well-known-addresses>
    <socket-address id="1">
     <address>ipaddress1</address>
     <port>8088</port>
    </socket-address>
    <socket-address id="2">
     <address>ipaddress2</address>
     <port>8088</port>
    </socket-address>
   </well-known-addresses>
   <address>ipaddress1</address>
   <port>8088</port>
  </unicast-listener>

 </cluster-config>
</coherence>


cache-config.xml

<?xml version="1.0"?>
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
 xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config.xsd">
 <caching-scheme-mapping>
  <cache-mapping>
   <cache-name>*</cache-name>
   <scheme-name>example-replicated</scheme-name>
  </cache-mapping>
 </caching-scheme-mapping>
 <caching-schemes>
  <replicated-scheme>
   <scheme-name>example-replicated</scheme-name>
   <service-name>ReplicatedCache</service-name>
   <backing-map-scheme>
    <local-scheme>
     <scheme-name>replicated</scheme-name>
     <expiry-delay>0</expiry-delay>
    </local-scheme>
   </backing-map-scheme>
   <autostart>true</autostart>
  </replicated-scheme>
 </caching-schemes>
</cache-config>

3 comments:

web lol said...

kul pejg

Website development services said...

Hmm it looks like your website ate my first comment (it was extremely long) so I guess I’ll just sum it up what I wrote and say, I’m thoroughly enjoying your blog. I as well am an aspiring blog blogger but I’m still new to the whole thing. Do you have any tips and hints for rookie blog writers? I’d really appreciate it.

vận chuyển hàng đi Trung Quốc said...
This comment has been removed by a blog administrator.