collectd-memcached-bucket

Grab stats for individual buckets in the NorthMemCouchScaleBase bucket engine and shove them into collectd
git clone https://code.literati.org/collectd-memcached-bucket.git
Log | Files | Refs

commit 49c03d992bbe06293c4ba5c7cb78f47fc7ab677d
Author: Sean Lynch <seanl@literati.org>
Date:   Tue, 26 Jan 2010 22:35:17 +0000

initial commit

Diffstat:
Acollectd_memcached_buckets.py | 93+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Agenload.py | 42++++++++++++++++++++++++++++++++++++++++++
Amc_bin_client.py | 240+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
AmemcacheConstants.py | 86+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asetup.py | 9+++++++++
Atypes.db | 1+
6 files changed, 471 insertions(+), 0 deletions(-)

diff --git a/collectd_memcached_buckets.py b/collectd_memcached_buckets.py @@ -0,0 +1,93 @@ +#!/usr/bin/python +import sys, time +sys.path = ['/usr/local/lib/python2.6/site-packages'] + sys.path + +import mc_bin_client +import pycollectd + +""" +memcached_command value:COUNTER:0:U +memcached_connections value:GAUGE:0:U +memcached_items value:GAUGE:0:U +memcached_octets rx:COUNTER:0:4294967295, tx:COUNTER:0:4294967295 +memcached_ops value:COUNTER:0:134217728 + +APPEND_STAT("cmd_get", "%"PRIu64, thread_stats.get_cmds); +APPEND_STAT("cmd_set", "%"PRIu64, slab_stats.set_cmds); +APPEND_STAT("cmd_flush", "%"PRIu64, thread_stats.flush_cmds); +APPEND_STAT("get_hits", "%"PRIu64, slab_stats.get_hits); +APPEND_STAT("get_misses", "%"PRIu64, thread_stats.get_misses); +APPEND_STAT("delete_misses", "%"PRIu64, thread_stats.delete_misses); +APPEND_STAT("delete_hits", "%"PRIu64, slab_stats.delete_hits); +APPEND_STAT("incr_misses", "%"PRIu64, thread_stats.incr_misses); +APPEND_STAT("incr_hits", "%"PRIu64, thread_stats.incr_hits); +APPEND_STAT("decr_misses", "%"PRIu64, thread_stats.decr_misses); +APPEND_STAT("decr_hits", "%"PRIu64, thread_stats.decr_hits); +APPEND_STAT("cas_misses", "%"PRIu64, thread_stats.cas_misses); +APPEND_STAT("cas_hits", "%"PRIu64, slab_stats.cas_hits); +APPEND_STAT("cas_badval", "%"PRIu64, slab_stats.cas_badval); +APPEND_STAT("bytes_read", "%"PRIu64, thread_stats.bytes_read); +APPEND_STAT("bytes_written", "%"PRIu64, thread_stats.bytes_written); + + +""" + +def get_hostname_from_collectd_config(fn): + import re + data = open(fn).read() + r = re.compile('^Hostname\s*"([^"]+)"$', re.IGNORECASE|re.MULTILINE) + m = r.search(data) + return m.group(1) if m is not None else None + +def main(): + from optparse import OptionParser + + parser = OptionParser() + parser.add_option('-H', '--hostname', dest='hostname') + parser.add_option('-m', '--memcached-host', dest='memcached_host') + parser.add_option('-u', '--memcached-username', dest='memcached_username') + parser.add_option('-p', '--memcached-password', dest='memcached_password') + + options, args = parser.parse_args() + + mc = mc_bin_client.MemcachedClient(options.memcached_host or '127.0.0.1') + if options.memcached_username: + mc.sasl_auth_cram_md5(options.memcached_username, options.memcached_password) + + hostname = options.hostname or get_hostname_from_collectd_config('/etc/collectd/collectd.conf') + assert hostname + plugin = 'memcached_bucket' + type = 'memcached_bucket' + ops = ['cmd_get', + 'cmd_set', + 'cmd_flush', + 'get_hits', + 'get_misses', + 'delete_misses', + 'delete_hits', + 'incr_misses', + 'incr_hits', + 'decr_misses', + 'decr_hits', + 'cas_misses', + 'cas_hits', + 'cas_badval', + 'bytes_read', + 'bytes_written', + 'evictions', + ] + + c = pycollectd.Exec() + l = mc.bucket_list() + now = time.time() + for bucket in sorted(l): + identifier = pycollectd.Identifier(hostname, plugin, None, type, bucket) + mc.bucket_select(bucket) + stats = mc.stats() + values = [ stats[op] for op in ops ] + c.putval(identifier, now, values) + + +if __name__ == '__main__': + main() + diff --git a/genload.py b/genload.py @@ -0,0 +1,42 @@ +#!/usr/bin/python +import random, time +import mc_bin_client + +def mkkey(): + return hex(random.randint(0, 0xffff)) + +def mkval(): + return str(random.randint(0, 10000)) + +def main(): + mc = mc_bin_client.MemcachedClient() + mc.sasl_auth_cram_md5('admin', 'nai0eeNi') + + start = time.time() + ops = 0 + while True: + key = mkkey() + val = mkval() + op = random.randint(0, 3) + try: + if op == 0: + mc.set(key, 60, 0, val) + elif op == 1: + mc.get(key) + elif op == 2: + mc.incr(key, exp=60) + elif op == 3: + mc.decr(key, exp=60) + except mc_bin_client.MemcachedError as e: + if e.status != 1: + print e.message + + ops += 1 + + if ops % 10000 == 0: + print 'ops/sec = {0}'.format(ops/(time.time() - start)) + + +if __name__ == '__main__': + main() + diff --git a/mc_bin_client.py b/mc_bin_client.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python +""" +Binary memcached test client. + +Copyright (c) 2007 Dustin Sallings <dustin@spy.net> +""" + +import sys +import time +import hmac +import socket +import random +import struct +import exceptions + +from memcacheConstants import REQ_MAGIC_BYTE, RES_MAGIC_BYTE +from memcacheConstants import REQ_PKT_FMT, RES_PKT_FMT, MIN_RECV_PACKET +from memcacheConstants import SET_PKT_FMT, DEL_PKT_FMT, INCRDECR_RES_FMT +import memcacheConstants + +class MemcachedError(exceptions.Exception): + """Error raised when a command fails.""" + + def __init__(self, status, msg): + supermsg='Memcached error #' + `status` + if msg: supermsg += ": " + msg + exceptions.Exception.__init__(self, supermsg) + + self.status=status + self.msg=msg + + def __repr__(self): + return "<MemcachedError #%d ``%s''>" % (self.status, self.msg) + +class MemcachedClient(object): + """Simple memcached client.""" + + def __init__(self, host='127.0.0.1', port=11211): + self.s=socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.s.connect_ex((host, port)) + self.r=random.Random() + + def close(self): + self.s.close() + + def __del__(self): + self.close() + + def _sendCmd(self, cmd, key, val, opaque, extraHeader='', cas=0): + dtype=0 + msg=struct.pack(REQ_PKT_FMT, REQ_MAGIC_BYTE, + cmd, len(key), len(extraHeader), dtype, + len(key) + len(extraHeader) + len(val), opaque, cas) + self.s.send(msg + extraHeader + key + val) + + def _handleKeyedResponse(self, myopaque): + response = "" + while len(response) < MIN_RECV_PACKET: + response += self.s.recv(MIN_RECV_PACKET - len(response)) + assert len(response) == MIN_RECV_PACKET + magic, cmd, keylen, extralen, dtype, errcode, remaining, opaque, cas=\ + struct.unpack(RES_PKT_FMT, response) + if remaining > 0: + rv=self.s.recv(remaining) + else: + rv="" + assert magic == RES_MAGIC_BYTE, "Got magic: %d" % magic + assert myopaque is None or opaque == myopaque, \ + "expected opaque %x, got %x" % (myopaque, opaque) + if errcode != 0: + raise MemcachedError(errcode, rv) + return opaque, cas, keylen, rv + + def _handleSingleResponse(self, myopaque): + opaque, cas, keylen, data = self._handleKeyedResponse(myopaque) + return opaque, cas, data + + def _doCmd(self, cmd, key, val, extraHeader='', cas=0): + """Send a command and await its response.""" + opaque=self.r.randint(0, 2**32) + self._sendCmd(cmd, key, val, opaque, extraHeader, cas) + return self._handleSingleResponse(opaque) + + def _mutate(self, cmd, key, exp, flags, cas, val): + return self._doCmd(cmd, key, val, struct.pack(SET_PKT_FMT, flags, exp), + cas) + + def _cat(self, cmd, key, cas, val): + return self._doCmd(cmd, key, val, '', cas) + + def append(self, key, value, cas=0): + return self._cat(memcacheConstants.CMD_APPEND, key, cas, value) + + def prepend(self, key, value, cas=0): + return self._cat(memcacheConstants.CMD_PREPEND, key, cas, value) + + def __incrdecr(self, cmd, key, amt, init, exp): + something, cas, val=self._doCmd(cmd, key, '', + struct.pack(memcacheConstants.INCRDECR_PKT_FMT, amt, init, exp)) + return struct.unpack(INCRDECR_RES_FMT, val)[0], cas + + def incr(self, key, amt=1, init=0, exp=0): + """Increment or create the named counter.""" + return self.__incrdecr(memcacheConstants.CMD_INCR, key, amt, init, exp) + + def decr(self, key, amt=1, init=0, exp=0): + """Decrement or create the named counter.""" + return self.__incrdecr(memcacheConstants.CMD_DECR, key, amt, init, exp) + + def set(self, key, exp, flags, val): + """Set a value in the memcached server.""" + return self._mutate(memcacheConstants.CMD_SET, key, exp, flags, 0, val) + + def add(self, key, exp, flags, val): + """Add a value in the memcached server iff it doesn't already exist.""" + return self._mutate(memcacheConstants.CMD_ADD, key, exp, flags, 0, val) + + def replace(self, key, exp, flags, val): + """Replace a value in the memcached server iff it already exists.""" + return self._mutate(memcacheConstants.CMD_REPLACE, key, exp, flags, 0, + val) + + def __parseGet(self, data): + flags=struct.unpack(memcacheConstants.GET_RES_FMT, data[-1][:4])[0] + return flags, data[1], data[-1][4:] + + def get(self, key): + """Get the value for a given key within the memcached server.""" + parts=self._doCmd(memcacheConstants.CMD_GET, key, '') + return self.__parseGet(parts) + + def cas(self, key, exp, flags, oldVal, val): + """CAS in a new value for the given key and comparison value.""" + self._mutate(memcacheConstants.CMD_SET, key, exp, flags, + oldVal, val) + + def version(self): + """Get the value for a given key within the memcached server.""" + return self._doCmd(memcacheConstants.CMD_VERSION, '', '') + + def sasl_mechanisms(self): + """Get the supported SASL methods.""" + return set(self._doCmd(memcacheConstants.CMD_SASL_LIST_MECHS, + '', '')[2].split(' ')) + + def sasl_auth_start(self, mech, data): + """Start a sasl auth session.""" + return self._doCmd(memcacheConstants.CMD_SASL_AUTH, mech, data) + + def sasl_auth_plain(self, user, password, foruser=''): + """Perform plain auth.""" + return self.sasl_auth_start('PLAIN', '\0'.join([foruser, user, password])) + + def sasl_auth_cram_md5(self, user, password): + """Start a plan auth session.""" + try: + self.sasl_auth_start('CRAM-MD5', '') + except MemcachedError, e: + if e.status != memcacheConstants.ERR_AUTH_CONTINUE: + raise + challenge = e.msg + + dig = hmac.HMAC(password, challenge).hexdigest() + return self._doCmd(memcacheConstants.CMD_SASL_STEP, 'CRAM-MD5', + user + ' ' + dig) + + def bucket_list(self): + """List buckets.""" + s_str = self._doCmd(memcacheConstants.CMD_LIST_BUCKETS, '', '')[2] + if s_str: + s = set(s_str.split(' ')) + else: + s = set() + return s + + def bucket_create(self, name, config=""): + """Create a bucket.""" + return self._doCmd(memcacheConstants.CMD_CREATE_BUCKET, name, config) + + def bucket_delete(self, name): + """Delete a bucket.""" + return self._doCmd(memcacheConstants.CMD_DELETE_BUCKET, name, '') + + def bucket_expand(self, name, new_size): + """Create a bucket.""" + return self._doCmd(memcacheConstants.CMD_EXPAND_BUCKET, name, str(new_size)) + + def bucket_select(self, name): + return self._doCmd(memcacheConstants.CMD_SELECT_BUCKET, name, '') + + def getMulti(self, keys): + """Get values for any available keys in the given iterable. + + Returns a dict of matched keys to their values.""" + opaqued=dict(enumerate(keys)) + terminal=len(opaqued)+10 + # Send all of the keys in quiet + for k,v in opaqued.iteritems(): + self._sendCmd(memcacheConstants.CMD_GETQ, v, '', k) + + self._sendCmd(memcacheConstants.CMD_NOOP, '', '', terminal) + + # Handle the response + rv={} + done=False + while not done: + opaque, cas, data=self._handleSingleResponse(None) + if opaque != terminal: + rv[opaqued[opaque]]=self.__parseGet((opaque, cas, data)) + else: + done=True + + return rv + + def stats(self, sub=''): + """Get stats.""" + opaque=self.r.randint(0, 2**32) + self._sendCmd(memcacheConstants.CMD_STAT, sub, '', opaque) + done = False + rv = {} + while not done: + opaque, cas, klen, data = self._handleKeyedResponse(None) + if klen: + rv[data[0:klen]] = data[klen:] + else: + done = True + return rv + + def noop(self): + """Send a noop command.""" + return self._doCmd(memcacheConstants.CMD_NOOP, '', '') + + def delete(self, key, cas=0): + """Delete the value for a given key within the memcached server.""" + return self._doCmd(memcacheConstants.CMD_DELETE, key, '', '', cas) + + def flush(self, timebomb=0): + """Flush all storage in a memcached instance.""" + return self._doCmd(memcacheConstants.CMD_FLUSH, '', '', + struct.pack(memcacheConstants.FLUSH_PKT_FMT, timebomb)) diff --git a/memcacheConstants.py b/memcacheConstants.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python +""" + +Copyright (c) 2007 Dustin Sallings <dustin@spy.net> +""" + +import struct + +# Command constants +CMD_GET = 0 +CMD_SET = 1 +CMD_ADD = 2 +CMD_REPLACE = 3 +CMD_DELETE = 4 +CMD_INCR = 5 +CMD_DECR = 6 +CMD_QUIT = 7 +CMD_FLUSH = 8 +CMD_GETQ = 9 +CMD_NOOP = 10 +CMD_VERSION = 11 +CMD_STAT = 0x10 +CMD_APPEND = 0x0e +CMD_PREPEND = 0x0f + +# SASL stuff +CMD_SASL_LIST_MECHS = 0x20 +CMD_SASL_AUTH = 0x21 +CMD_SASL_STEP = 0x22 + +# Bucket extension +CMD_CREATE_BUCKET = 0x25 +CMD_DELETE_BUCKET = 0x26 +CMD_LIST_BUCKETS = 0x27 +CMD_EXPAND_BUCKET = 0x28 +CMD_SELECT_BUCKET = 0x29 + +# Flags, expiration +SET_PKT_FMT=">II" + +# flags +GET_RES_FMT=">I" + +# How long until the deletion takes effect. +DEL_PKT_FMT="" + +# amount, initial value, expiration +INCRDECR_PKT_FMT=">QQI" +# Special incr expiration that means do not store +INCRDECR_SPECIAL=0xffffffff +INCRDECR_RES_FMT=">Q" + +# Time bomb +FLUSH_PKT_FMT=">I" + +MAGIC_BYTE = 0x80 +REQ_MAGIC_BYTE = 0x80 +RES_MAGIC_BYTE = 0x81 + +# magic, opcode, keylen, extralen, datatype, [reserved], bodylen, opaque, cas +REQ_PKT_FMT=">BBHBBxxIIQ" +# magic, opcode, keylen, extralen, datatype, status, bodylen, opaque, cas +RES_PKT_FMT=">BBHBBHIIQ" +# min recv packet size +MIN_RECV_PACKET = struct.calcsize(REQ_PKT_FMT) +# The header sizes don't deviate +assert struct.calcsize(REQ_PKT_FMT) == struct.calcsize(RES_PKT_FMT) + +EXTRA_HDR_FMTS={ + CMD_SET: SET_PKT_FMT, + CMD_ADD: SET_PKT_FMT, + CMD_REPLACE: SET_PKT_FMT, + CMD_INCR: INCRDECR_PKT_FMT, + CMD_DECR: INCRDECR_PKT_FMT, + CMD_DELETE: DEL_PKT_FMT, + CMD_FLUSH: FLUSH_PKT_FMT, +} + +EXTRA_HDR_SIZES=dict( + [(k, struct.calcsize(v)) for (k,v) in EXTRA_HDR_FMTS.items()]) + +ERR_UNKNOWN_CMD = 0x81 +ERR_NOT_FOUND = 0x1 +ERR_EXISTS = 0x2 +ERR_AUTH = 0x20 +ERR_AUTH_CONTINUE = 0x21 diff --git a/setup.py b/setup.py @@ -0,0 +1,9 @@ +#!/usr/bin/python +from setuptools import setup, find_packages + +setup(name = 'collectd_memcached_bucket', + version = '0.1a1', + packages = find_packages(), +) + + diff --git a/types.db b/types.db @@ -0,0 +1 @@ +memcached_bucket cmd_get:COUNTER:0:134217728, cmd_set:COUNTER:0:134217728, get_hits:COUNTER:0:134217728, get_misses:COUNTER:0:134217728, cas_hits:COUNTER:0:134217728, cas_misses:COUNTER:0:134217728, incr_hits:COUNTER:0:134217728, incr_misses:COUNTER:0:134217728, decr_hits:COUNTER:0:134217728, decr_misses:COUNTER:0:134217728, delete_hits:COUNTER:0:134217728, delete_misses:COUNTER:0:134217728