使用jmx client监控activemq
2012-07-21 22:18:36   来源:   评论:0 点击:

笔者所在的公司使用了小部分的activemq,在使用过程中由于消费者的性能问题出现过几次queue堆满的现象。基于此,笔者写了几个小程序来监控activemq的使用情况,其原理是基于jmx client.

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

正常情况下activemq不会出现任何性能问题,但是在durable的topic模式下,任何一个消费者产生问题都会是致命的,因为这个时候topic不能够被删除,内存会迅速升高。

这里有两方面需要监控,一个是出现了问题之后,究竟是哪一个consumer出现了故障;另外一个就是对于不同类型的topic(queue), 我们究竟如何来实现自定义的报警和阀值?

对于第一个问题,笔者写了一个简单的基于jmx client的shell:
 



#!/bin/bash
queueName=$1
if [ -n "$queueName" ]
then
  cmd="java -jar /usr/local/activemq/bin/cmdline-jmxclient-0.10.3.jar - localhost:1099"
  for i in `java -jar /usr/local/activemq/bin/cmdline-jmxclient-0.10.3.jar - localhost:1099 |grep 'Type=Subscription' |grep 'destinationType=Topic' |grep "de
stinationName=$queueName"`
  do
  echo "#####################A new QUEUE################"
  echo $i
  $cmd $i DiscardedCount
  $cmd $i MaximumPendingQueueSize
  $cmd $i Priority
  $cmd $i Active
  $cmd $i ConnectionId
  $cmd $i Selector
  $cmd $i PrefetchSize
  $cmd $i ClientId
  $cmd $i Durable
  $cmd $i DestinationName
  $cmd $i MaximumPendingMessageLimit
  $cmd $i Exclusive
  $cmd $i Retroactive
  $cmd $i PendingQueueSize
  $cmd $i DispatchedQueueSize
  $cmd $i DispatchedCounter
  $cmd $i EnqueueCounter
  $cmd $i DequeueCounter
  $cmd $i NoLocal
  $cmd $i SessionId
  $cmd $i SubcriptionId
  $cmd $i DestinationQueue
  $cmd $i DestinationTopic
  $cmd $i DestinationTemporary
  $cmd $i SubcriptionName
  $cmd $i MessageCountAwaitingAcknowledge
  done
fi



对于第二种问题,笔者写了一段基于配置文件的py程序,由于程序比较简单,下面是代码和配置范例:

#!/usr/bin/python
# -*- coding: utf-8 -*-
# Written by Eric to monitor activemq status

import os
import re
import subprocess
import logging.handlers

# Alarm threshold is default 80% memory usage for each queue or topic
MEMORY_THRESHOLD = 80

# lists to store use defined threshold
userqueues=[]
userqtypes=[]
userthresholds=[]

# lists to store activemq queues and topics
allqueues=[]
alltopics=[]

# initial returncode, we will check all queues and topics and add up +1 if anyone exceeds the threshold
returncode = 0

# LOGFILE and CONFIGURE FILE
LOG_FILE = "/home/logs/activeMQ/monitor_activemq.log"
CONF_FILE = "/root/monitor_activemq.conf"
LOCK_FILE = "/home/logs/activeMQ/monitor_activemq.lock"

# python logger module
logger = logging.getLogger()
hdlr = logging.handlers.RotatingFileHandler(LOG_FILE, maxBytes=1000000, backupCount=5)
#set format
formatter = logging.Formatter('[%(asctime)s]%(levelname)-8s"%(message)s"','%Y-%m-%d %a %H:%M:%S')
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
#set log level
logger.setLevel(logging.CRITICAL)
#logger.setLevel(logging.INFO)
 


# Get user defined threshold, the default is 80%
# Configure file:
# cdn.dc.groupevent.0,queue,50
# cdn.dc.groupevent.*,queue,60
# cdn.dc.>,queue,70
# 以最后一项符合的为准, .*为当前目录匹配,.>为所有子目录匹配,其余为完全匹配
def get_userdef():
    result = os.path.exists(CONF_FILE)
    if result:
        lines = os.popen("cat %s" % CONF_FILE).readlines()
        for line in lines:
            line=line.strip()
            if line == "":
                continue
            userqueue,userqtype,userthreshold = line.split(",")
            userqueues.append(userqueue)
            userqtypes.append(userqtype)
            userthresholds.append(userthreshold)
 
# the function is used to get the real threshold based on users configruation file
def get_threshold(name,qtype):
    tmpresult = MEMORY_THRESHOLD
    for i in range(len(userqueues)):
        if userqtypes[i] == qtype:
            re_obj1 = re.compile(r'(.+?)>$')
            re_obj2 = re.compile(r'(.+?)\*$')
            re_obj5 = re.compile(r'Destination='+userqueues[i]+',Type=')
            match1 = re_obj1.search(userqueues[i])
            match2 = re_obj2.search(userqueues[i])
            if match1:
                prefix = match1.groups()[0]
                re_obj3 = re.compile(r'Destination='+prefix+'.+?,Type')
                if re_obj3.search(name):
                    tmpresult = userthresholds[i]
            elif match2:
                prefix = match2.groups()[0]
                re_obj4 = re.compile(r'Destination='+prefix+'[^\.]+?,Type')
                if re_obj4.search(name):
                    tmpresult = userthresholds[i]
            elif re_obj5.search(name):
                tmpresult = userthresholds[i]
    return int(tmpresult)

if __name__ == "__main__":
    os.popen("touch %s" % LOCK_FILE)
    # set PATH env, as we need at least v1.6 java to get result
    env_path = os.getenv('PATH')
    env_path = "%s:%s" % ("/usr/local/jdk/bin:/usr/local/java/bin",env_path)
    # print env_path

    get_userdef()

    queuecmd = '''java -jar /usr/local/activemq/bin/cmdline-jmxclient-0.10.3.jar - localhost:1099|grep -v Advisory|egrep "Type=Queue$"'''
    topiccmd   = '''java -jar /usr/local/activemq/bin/cmdline-jmxclient-0.10.3.jar - localhost:1099|grep -v Advisory|egrep "Type=Topic$"'''
    allqueues = subprocess.Popen(queuecmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, env={'PATH': env_path}).communicate()[0]
    alltopics = subprocess.Popen(topiccmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, env={'PATH': env_path}).communicate()[0]
    #print allqueues
    #print alltopics
    #print userqueues

    # checking queues
    for tmpqueue in allqueues.split("\n"):
        tmpqueue = tmpqueue.strip()
        if tmpqueue == "":
            continue
        memorycmd = '''java -jar /usr/local/activemq/bin/cmdline-jmxclient-0.10.3.jar - localhost:1099 %s MemoryPercentUsage''' % tmpqueue
        #print memorycmd
        memoryusage = subprocess.Popen(memorycmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, env={'PATH': env_path}).communicate()[1]
        memoryusage = re.search("MemoryPercentUsage: \d*",memoryusage).group(0).split(":")[1]
        memoryusage = int(memoryusage.strip())
        threshold = get_threshold(tmpqueue,"queue")
        if memoryusage>threshold:
            logger.critical("%s memory usage: %d larger than threshold %d" % (tmpqueue, memoryusage,threshold))
            returncode = returncode + 1
        else:
            logger.info("%s memory usage: %d smaller than threshold %d" % (tmpqueue, memoryusage,threshold))
   
        # checking topics
    for tmptopic in alltopics.split("\n"):
        tmptopic = tmptopic.strip()
        if tmptopic == "":
            continue
        memorycmd = '''java -jar /usr/local/activemq/bin/cmdline-jmxclient-0.10.3.jar - localhost:1099 %s MemoryPercentUsage''' % tmptopic
        #print memorycmd
        memoryusage = subprocess.Popen(memorycmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, env={'PATH': env_path}).communicate()[1]
        memoryusage = re.search("MemoryPercentUsage: \d*",memoryusage).group(0).split(":")[1]
        memoryusage = int(memoryusage.strip())
        threshold = get_threshold(tmptopic,"topic")
        if memoryusage>threshold:
            logger.critical("%s memory usage: %d larger than threshold %d" % (tmptopic, memoryusage,threshold))
            returncode = returncode + 1
        else:
            logger.info("%s memory usage: %d smaller than threshold %d" % (tmptopic, memoryusage,threshold))
    os.popen("rm -f %s" % LOCK_FILE)
    #print returncode
    os.popen("echo %s > /home/logs/activeMQ/monitor_activemq.result" % returncode)

配置文件中定义了名称,类型和阀值:
[root@t ~]# cat /root/monitor_activemq.conf
amq.dc.>,queue,80
amq.dc.>,topic,80
amq.monitor.>,topic,20
amq.monitor.>,queue,100
amq.monitor.59.*,queue,5

相关热词搜索:jmx client 监控 activemq

上一篇:Mysql数据库复制延时分析
下一篇:定位和消除额外的oracle数据库延迟

分享到: 收藏
iTechClub广告