MBean Publisher

The MBean publisher is used to publish arbitrary case classes as DynamicMBeans via JMX. A generic mapper will examine the structure of the given case class instance and recursively map all attributes to corresponding attributes within th MBean.

As such the interface definition for the publishing service is:

trait Service {
/**
* Retrieve the list of object names that are currently registered by this service.
*/
def managedNames: ZIO[Any, Nothing, List[String]]
/**
* Create or update the MBean within JMX with the <code>DynamicMBean</code> representation of the given case class instance.
*/
def updateMBean[T <: Product](
v: T
)(implicit f: T => Nameable[T]): ZIO[Any, MBeanPublishException, Unit]
/**
* Remove the registration from JMX for the object name derived from the given case class.
*/
def removeMBean[T <: Product](v: T)(implicit f: T => Nameable[T]): ZIO[Any, MBeanPublishException, Unit]
}

Using the MBean publisher#

The easiest way to use the MBeanPublisher is to make it available through a layer like it is done within the tests:

private val logSlf4j = Slf4jLogger.make((_, message) => message)
private val mbeanLayer: ZLayer[Any, Nothing, MBeanServerFacade.MBeanServerFacade] =
logSlf4j >>> MBeanServerFacade.live

Then the MBeanPublisher can be used by simply passing a case class to updateMBean. For now the case class also needs to implement Nameable so that the proper ObjectName can be calculated.

private val simplePublish = testM("publish a simple case class")(for {
pub <- ZIO.service[ProductMBeanPublisher.Service]
fac <- ZIO.service[MBeanServerFacade.Service]
cc <- ZIO.succeed(Simple("test1", 0, "Hello Jmx"))
info <- pub.updateMBean(cc) >>> fac.mbeanInfo(objectName(cc))
} yield {
val keys = info.attributes.value.keys.toList
assert(keys)(contains("counter")) &&
assert(keys)(contains("message")) &&
assert(keys)(hasSize(equalTo(3))) &&
assert(info.attributes.value("counter").value.asInstanceOf[Int])(equalTo(0))
})

In the test case we are also using the MBeanServerFacade to verify that the MBean has been published correctly and has the correct values.

note

The implementation keeps track of all instances that have been published. Only the first call to publish will actually register the MBean while subsequent calls will only update the underlying value. The Service makes sure that updates to MBeans are only allowed for MBeans that are based on the same Class.

Implementation details#

The service implementation keeps track of the published values in a TMap with the object name as key and the DynamicMBean wrapper around the case class.

To manipulate the TMap we use some helper methods to either create or update an entry within the TMap:

private def updateMBean[T <: Product](old: OpenProductMBean, bean: T): STM[Throwable, Unit] =
STM.ifM(STM.succeed(old.beanClass.equals(bean.getClass)))(
STM.fromTry(Try {
val mapped = mapper.mapProduct(bean)
old.update(bean.getClass, mapped).get
}),
STM.fail(new IncompatibleJmxUpdateException(bean.getClass, old.beanClass))
)
private def createMBean[T <: Product](bean: T)(implicit f: T => Nameable[T]): STM[Throwable, Unit] =
STM
.fromTry(Try {
val on: ObjectName = new ObjectName(objectName(bean).objectName)
try svr.unregisterMBean(on)
catch {
case _: InstanceNotFoundException => // swallow that exception
}
val mapped = mapper.mapProduct(bean)
val b = new OpenProductMBean(bean.getClass, mapped)
svr.registerMBean(b, new ObjectName(objectName(bean).objectName))
b
})
.flatMap { b =>
self.beans.put(objectName(bean).objectName, b)
}
private def createOrUpdate[T <: Product](bean: T)(implicit f: T => Nameable[T]): STM[Throwable, Unit] =
self.beans.get(objectName(bean).objectName).flatMap {
case Some(e) => updateMBean(e, bean)
case None => createMBean(bean)
}
caution

The implementation uses STM under the covers. It is important to note that STM code should not include side effecting code that might not be idempotent (such as appending to a file or as in our case register an MBean). The reason for that is that the STM code will be retried if any of the STM-values that are being touched by the operation is changed from another fiber.

In our case the tests were failing when run in parallel because the registration in JMX might have executed multiple times, which in turn caused a JMX exception.

For now we are simply ignoring that specific JMX exception, but a better solution might be looking at another mechanism than TMap to handle that scenario.

(Also see the discussion on Discord)

With the helper methods in place, actual service implementation methods is fairly straightforward:

def updateMBean[T <: Product](
v: T
)(implicit f: T => Nameable[T]): ZIO[Logging, MBeanPublishException, Unit] =
createOrUpdate(v).commit.mapError {
case mbe: MBeanPublishException => mbe
case _: javax.management.InstanceAlreadyExistsException => new InstanceAlreadyExistsException[T](v, objectName(v))
case t => new JmxException(t)
} <* log.debug(s"updated MBean with name [${objectName(v)}}] to [$v]")
def removeMBean[T <: Product](v: T)(implicit f: T => Nameable[T]): ZIO[Logging, MBeanPublishException, Unit] =
self.beans
.get(objectName(v).objectName)
.flatMap {
case Some(_) =>
STM.fromTry(Try {
try svr.unregisterMBean(new ObjectName(objectName(v).objectName))
catch {
case _: InstanceNotFoundException => // swallow that exception as it may occur in STM retries
}
}) >>> self.beans.delete(objectName(v).objectName)
case None => STM.unit
}
.commit
.mapError {
case mbe: MBeanPublishException => mbe
case t => new JmxException(t)
} <* log.debug(s"Removed MBean with name [${objectName(v)}]")