Memcached 作用与使用 基本介绍
1,对于缓存的存取方式,简言之,就是以键值对的形式将数据保存在内存中。在日常业务中涉及的操作无非就是增删改查。加入缓存机制后,查询的时候,对数据进行缓存,增删改的时候,清除缓存即可。这其中对于缓存的闭合就非常重要,如果缓存没有及时得到更新,那用户就会获取到过期数据,就会产生问题。
2,对于单一业务的缓存管理(数据库中只操作单表),只需生成一个key,查询时,使用key,置入缓存;增删改时,使用key,清除缓存。将key与表绑定,操作相对简单。
3,但是在现实业务中,更多的是对关联表的增删改查(数据库多表操作),业务之间互相关联,数据库中的某张表不止一个业务再进行操作,将缓存拦截在service层,对业务进行缓存,对多表进行缓存。
4,业务层缓存实现策略:
4.1,在缓存中建立一个key为"union_query",value为“hashmap<prefix,uqversion>(‘简称uqmap’)“的缓存,prefix保存的是当前业务操作涉及到的数据库表名的组合(数据库表名的唯一性),使用‘|’分隔(例 prefix=“A|B”,此次业务将操作A表与B表),uqversion是业务版本号,从0开始递增。
4.2,调用一个查询业务时,对数据进行缓存,设置operation为1,告诉cache对象,这是一个缓存操作,例如调用 queryAB(args[])方法时,cache对象切入,将prefix(即”A|B“)与uqversion(初始化为0),存入uqmap中进行缓存。
4.3,将prefix,uqversion,方法明+参数,进行拼接,使用md5进行加密后作为一个key,将方法的结果集作为value,进行缓存。至此缓存成功。
4.4,当第二个请求来调用queryAB时,cache对象切入,首先,查询uqmap对象,使用prefix找到对应的uqversion,然后,通过拼接加密获取key,最后取得结果集进行返回。
4.5,当有一个updateA方法被调用时,设置operation为4,告诉cache对象,这是一个删除缓存的操作,此时prefix的值为“A”,cache对象切入,获取全局的uqmap,遍历其中的prefix,是否包含了表A的名称:如果包含,则更新此prefix的uqversion进行自增,uqversion一旦发生变化,4.3中组合的key将不复存在,业务缓存也就消失了。(对于复杂的updateAB方法,遍历prefix要复杂一点,可以实现)
4.6,当第三个请求来调用queryAB时,可以获取到uqversion,组合成key后,但是没有对应的value。此时确定缓存不存在时,继续正常执行方法,获取结果集,返回给客户的同时,将结果集进行缓存。
5,对于缓存的操作,网上有三种api可以选择(memcached client forjava、spymemcached、xmemcached),具体的好坏,本人在这就不做分析。本人使用的是XMemcached api。
具体实现细节:
1,新建 @interface Annotation{ } 定义一个注解 @Annotation,一个注解是一个类。定义缓存策略。
import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * 用于查找的时候,放置缓存信息 * @author shufeng */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface XmemCache{ /** * 值为当前操作的表名,表名唯一 * 涉及到多表操作,使用|分隔 */ String prefix() default ""; /* * 缓存有效期 设置,单位为秒 * 指定间隔时间,默认值为3600秒(1小时) * */ int interval() default 3600; /** * 1 从cache里取值,如果未置入cache,则置入 * 2 replace cache value 未扩展 * 3 replace cache value,并返回旧值 未扩展 * 4 remove cache key 从cache里删除对应的缓存 * 5 remove cache key 从cache里删除对应的缓存,并返回未删除之前的值 未扩展 **/ int operation() default 1; }
2,memcache基础操作类,对一些常用方法进行封装,对memcachedclient进行配置
import java.io.IOException; import java.io.InputStream; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import com.node.hlhw.rbac.api.constant.Constant; import net.rubyeye.xmemcached.GetsResponse; import net.rubyeye.xmemcached.MemcachedClient; import net.rubyeye.xmemcached.MemcachedClientBuilder; import net.rubyeye.xmemcached.XMemcachedClientBuilder; import net.rubyeye.xmemcached.command.BinaryCommandFactory; import net.rubyeye.xmemcached.exception.MemcachedException; import net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator; import net.rubyeye.xmemcached.transcoders.SerializingTranscoder; import net.rubyeye.xmemcached.utils.AddrUtil; /** * @author Melody shufeng * 对memcachedclient进行封装,添加一下常用方法 */ public class MemcachedOperate implements DisposableBean { /* * timeout - Operation timeout,if the method is not returned in this * time,throw TimeoutException timeout - operation timeout,in milliseconds * exp - An expiration time, in seconds. Can be up to 30 days. After 30 * days, is treated as a unix timestamp of an exact date. value - stored * data */ private static final Logger logger = LoggerFactory.getLogger(MemcachedOperate.class); private static Properties PROPERTIES = new Properties(); private static String MEMCACHED_SETTING = "memcached.properties"; private static MemcachedClient memcachedClient; public static MemcachedClient getClient(){ return memcachedClient; } /** * 静态代码块,类加载时,初始化缓存客户端 * 确保只创建一个client实例 * author shufeng */ static { InputStream in = Object.class.getResourceAsStream("/" + MEMCACHED_SETTING); try { PROPERTIES.load(in); } catch (IOException e) { e.printStackTrace(); } String servers = PROPERTIES.getProperty("memcached.servers", ""); if (null != servers && !"".equals(servers)) { try { logger.debug("启动memcached连接"); MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses(servers)); builder.setConnectionPoolSize(100); builder.setFailureMode(true); builder.setCommandFactory(new BinaryCommandFactory()); builder.setSessionLocator(new KetamaMemcachedSessionLocator()); builder.setTranscoder(new SerializingTranscoder()); memcachedClient = builder.build(); memcachedClient.setEnableHeartBeat(false); // 关闭心跳 memcachedClient.flushAll(); // 清空缓存 } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (MemcachedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } } /** * @param key * @return 获取value */ public static Object get(String key) { Object object = null; try { object = memcachedClient.get(key); } catch (TimeoutException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (MemcachedException e) { e.printStackTrace(); } return object; } public static void setWithNoReply(String key, int exp, Object value) { try { memcachedClient.setWithNoReply(key, exp, value); } catch (InterruptedException e) { e.printStackTrace(); } catch (MemcachedException e) { e.printStackTrace(); } } /** * 查询联表的业务版本号 如果为空,则初始化 * * @param prefix * @return */ @SuppressWarnings("unchecked") public static Long getUnionQueryVersion(String prefix) { try { Map<String, Long> uqmap = null; GetsResponse<Object> getsresponse = memcachedClient.gets(Constant.UNION_QUERY); if (getsresponse == null) { uqmap = new HashMap<String, Long>(); Long uqversion = new Long(1); // 初始化版本号 uqmap.put(prefix, uqversion); if (memcachedClient.cas(Constant.UNION_QUERY, 0, uqmap, 0)) { // 检测插入之前是否被修改过 return uqversion; // 插入成功 } else { // 插入失败,说明在代码运行期间,已经有其他线程去修改了unionquery的缓存,重新进行查询 return getUnionQueryVersion(prefix); } } else { long cas = getsresponse.getCas(); Object uqobj = getsresponse.getValue(); if (uqobj == null) { // 不存在对象 uqmap = new HashMap<String, Long>(); Long uqversion = new Long(1); // 初始化版本号 uqmap.put(prefix, uqversion); if (memcachedClient.cas(Constant.UNION_QUERY, 0, uqmap, cas)) { // 检测插入之前是否被修改过 return uqversion; // 插入成功 } else { // 插入失败,说明在代码运行期间,已经有其他线程去修改了unionquery的缓存,重新进行查询 return getUnionQueryVersion(prefix); } } else { uqmap = (Map<String, Long>) uqobj; Long uqversion = uqmap.get(prefix); if (uqversion == null) { // 不存在此业务版本 uqversion = new Long(1); // 初始化版本号 uqmap.put(prefix, uqversion); if (memcachedClient.cas(Constant.UNION_QUERY, 0, uqmap, cas)) { // 检测插入之前是否被修改过 return uqversion; // 插入成功 } else { // 插入失败,说明在代码运行期间,已经有其他线程去修改了unionquery的缓存,重新进行查询 return getUnionQueryVersion(prefix); } } else { return uqversion; } } } } catch (TimeoutException | InterruptedException | MemcachedException e) { e.printStackTrace(); System.err.println("getUnionQueryVersion---Exception"); } return 1L; } /** * 查询单表的业务版本号 如果为空,则初始化 * * @return */ public static Long getVersion(String prefix) { try { GetsResponse<Object> getsresponse = memcachedClient.gets(prefix); if (getsresponse == null) { Long pfversion = new Long(1); if (memcachedClient.cas(prefix, 0, pfversion, 0)) { return pfversion; } else { return getVersion(prefix); } } else { Object pfobj = getsresponse.getValue(); long cas = getsresponse.getCas(); if (pfobj == null) { Long pfversion = new Long(1); if (memcachedClient.cas(prefix, 0, pfversion, cas)) { return pfversion; } else { return getVersion(prefix); } } else { return (Long) pfobj; } } } catch (TimeoutException | InterruptedException | MemcachedException e) { e.printStackTrace(); System.err.println("getVersion---Exception"); } return 1L; } /** * shufeng 更新 多表版本号 * 由于存在线程安全问题 ,会覆盖uqmap,更新unionquery业务版本号 * 使用cas方法解决线程安全问题 * 更新unionquery中key包含p1或p2或p3的version * @param prefix */ @SuppressWarnings("unchecked") public static void updateUnionQueryVersion(String prefix) { try { Map<String, Long> uqmap = null; GetsResponse<Object> getsresponse = memcachedClient.gets(Constant.UNION_QUERY); if (getsresponse == null) { return; } else { Object uqobj = getsresponse.getValue(); long cas = getsresponse.getCas(); if (uqobj == null) { return; } else { uqmap = (HashMap<String, Long>) uqobj; Set<String> uqset = uqmap.keySet(); // 遍历unionquery中的key Iterator<String> quit = uqset.iterator(); String uqkey = ""; boolean uqflag = false; while (quit.hasNext()) { uqkey = quit.next(); if (("|" + uqkey + "|").contains("|" + prefix + "|")) { // key中包含prefix uqmap.put(uqkey, uqmap.get(uqkey) + 1); // 更新map uqflag = true; } } if (uqflag) { if (!memcachedClient.cas(Constant.UNION_QUERY, 0, uqmap, cas)) { updateUnionQueryVersion(prefix); } } } } } catch (TimeoutException | InterruptedException | MemcachedException e) { e.printStackTrace(); System.err.println("updateUnionQueryVersion---Exception"); } } /** * 更新单表版本号 * * @param prefix * @return */ public static void updateVersion(String prefix) { try { GetsResponse<Object> getsresponse; getsresponse = memcachedClient.gets(prefix); if (getsresponse == null) { return ; } else { Object pfobj = getsresponse.getValue(); long cas = getsresponse.getCas(); if (pfobj == null) { return ; } else { Long pfversion = (Long) pfobj; pfversion += 1; if (!memcachedClient.cas(prefix, 0, pfversion, cas)) { updateVersion(prefix); } } } } catch (TimeoutException | InterruptedException | MemcachedException e) { e.printStackTrace(); System.err.println("updateVersion---Exception"); } } public void shutdown() { try { memcachedClient.shutdown(); } catch (IOException e) { e.printStackTrace(); } } @Override public void destroy() throws Exception { shutdown(); } }
3,结合spring aop 配置缓存,使用spring aop来切入业务层加入缓存,与业务进行解耦。使用注解进行方便配置。
import java.lang.reflect.Method; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.Signature; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSON; import com.node.hlhw.common.cache.XmemCache; import com.node.hlhw.common.digest.Md5Utils; @Component @Aspect public class MemcachedAop { @Pointcut("execution (* com.node.hlhw.*.service.impl.*.*(..))") public void pointcut() { } // 方法执行前调用 @Before("pointcut()") public void before() { } // 方法执行的前后调用 /** * * 改进建议:使用uuid作为版本号,减少版本号的读取,直接生成uuid,进行缓存 * 线程安全问题:存在线程安全问题,但是针对于缓存,问题不大。 * 多线程同一时间重复覆盖一个业务id,还是可以更新缓存 * * @param call * @throws Throwable */ @Around("pointcut()") public Object doAround(ProceedingJoinPoint call) throws Throwable { Object result = null; // 检测是否存在memcached客户端实例 if (MemcachedOperate.getClient() == null) { System.err.println("memcached client not exist"); result = call.proceed(); return result; } Signature signature = call.getSignature(); MethodSignature methodSignature = (MethodSignature) signature; Method method = methodSignature.getMethod(); if(!method.isAnnotationPresent(XmemCache.class)){ result = call.proceed(); return result; } XmemCache xmemcache = method.getAnnotation(XmemCache.class); // 获取操作方法 int operation = xmemcache.operation(); // 获取注解前缀,实际使用是为各个业务包名称,一般为表名 String prefix = xmemcache.prefix(); // 无前缀 if(prefix==null||"".equals(prefix)){ result = call.proceed(); return result; } // 获取注解配置memcached死亡时间 秒单位 int interval = xmemcache.interval(); switch (operation) { case 1: // 1 从cache里取值,如果未置入cache,则置入 // 判断prefix是否涉及多表,查看是否包含| if (prefix.contains("|")) { Long uqversion = MemcachedOperate.getUnionQueryVersion(prefix); String combinedkey = generCombinedKey(prefix, uqversion, method, call.getArgs()); Object resultobj = MemcachedOperate.get(combinedkey); if(resultobj == null){ result = call.proceed(); MemcachedOperate.setWithNoReply(combinedkey, interval, JSON.toJSONString(result));// 缓存数据 }else{ Class<?> returnType = ((MethodSignature) signature).getReturnType(); result = JSON.parseObject(resultobj.toString(), returnType); } } else { // 单表操作 Long pfversion = MemcachedOperate.getVersion(prefix); String combinedkey = generCombinedKey(prefix, pfversion, method, call.getArgs()); Object resultobj = MemcachedOperate.get(combinedkey); if(resultobj == null){ result = call.proceed(); MemcachedOperate.setWithNoReply(combinedkey, interval, JSON.toJSONString(result));// 缓存数据 }else{ Class<?> returnType = ((MethodSignature) signature).getReturnType(); result = JSON.parseObject(resultobj.toString(), returnType); } } break; case 2: // 2 replace cache value break; case 3: break; case 4: // 4 remove cache key 从cache里删除对应 业务版本的缓存 /* * 更新unionquery业务版本号 * 0,切割 prefix为p1、p2、p3 * 1,更新prefix为p1或p2或p3的version * 2,更新unionquery中key包含p1或p2或p3的version */ if (prefix.contains("|")) { // 表示涉及到多表,需要清除 单表的缓存,与联表中 包含 当前部分的 缓存 String[] prefixs = prefix.split("\\|"); // 0.切割 prefix为p1、p2、p3 for(String pf : prefixs){ MemcachedOperate.updateVersion(pf); // 1,更新prefix为p1或p2或p3的version MemcachedOperate.updateUnionQueryVersion(pf); } }else{ // 没有涉及到多表的时候 MemcachedOperate.updateVersion(prefix); MemcachedOperate.updateUnionQueryVersion(prefix); } result = call.proceed(); break; default: result = call.proceed(); break; } return result; } /** * 组装key值 * @param key * @param version * @param method * @param args * @return */ private String generCombinedKey(String key, Long version, Method method, Object[] args) { StringBuffer sb = new StringBuffer(); // 获取方法名 String methodName = method.getName(); // 获取参数类型 Object[] classTemps = method.getParameterTypes(); // 存入方法名 sb.append(methodName); for (int i = 0; i < args.length; i++) { sb.append(classTemps[i] + "&"); if (null == args[i]) { sb.append("null"); } else if ("".equals(args[i])) { sb.append("*"); } else { String tt = JSON.toJSONString(args[i]); sb.append(tt); } } sb.append(key); sb.append(version.toString()); String temp = Md5Utils.getMD5(sb.toString()); return temp; } }
4,properties文件中配置memcached服务器地址
#host1:port1,host2:port2 memcached.servers=192.168.1.1:11211,192.168.1.2:11211
5,修改spring配置文件,声明自动为spring容器中那些配置@aspectJ切面的bean创建代理,织入切面。
<aop:aspectj-autoproxy proxy-target-class="true"/>
6,service层使用注解方式切入缓存
import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Map; import org.apache.ibatis.session.RowBounds; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import com.alibaba.dubbo.common.utils.StringUtils; import com.alibaba.dubbo.config.annotation.Service; import com.node.hlhw.common.cache.XmemCache; import com.node.hlhw.common.digest.ApplicationUtils; import com.node.hlhw.core.service.BaseService; import com.node.hlhw.core.store.IBaseStore; import com.node.hlhw.core.store.PageParam; import com.node.hlhw.rbac.api.dao.UserRoleDao; import com.node.hlhw.rbac.api.entity.UserRole; import com.node.hlhw.rbac.api.service.UserRoleService; /** * @author Melody * 处理用户角色 */ @Service(version = "1.0.0") public class UserRoleServiceImpl extends BaseService<UserRole> implements UserRoleService { private static final Logger logger = Logger .getLogger(UserRoleServiceImpl.class); @Autowired public UserRoleDao userRoleDao; @Override protected IBaseStore<UserRole> getBaseDao() { return userRoleDao; } /* * 单表操作,prefix为表名,operation为4,只进行缓存的删除操作 */ @XmemCache(prefix="userrole",operation=4) public void insertUserRole(UserRole userRole) throws Exception { userRoleDao.insertUserRole(userRole); logger.info("插入用户角色数据"); } /* (non-Javadoc) * 此方法操作了两个表,role与userole,使用‘|’进行分隔 * operation为1,表示缓存操作,对结果集进行缓存 * interval表示缓存时间默认不填为3600秒,也可指定具体时长 */ @Override @XmemCache(prefix="role|userrole",interval=3600 , operation=1) public List<Map<String, Object>> selectUserRoleList(UserRole userrole, PageParam pageParam) throws Exception { RowBounds rowBounds = new RowBounds(pageParam.getOffset(),pageParam.getLimit()); List<Map<String, Object>> list = userRoleDao.selectUserRoleList(userrole,rowBounds); return list ; } @Override @XmemCache(prefix="userrole" , operation=4) public void modifyUserRole(UserRole userrole, String[] roleids)throws Exception { //删除所包含的角色 userRoleDao.deleteByUserRole(userrole); for(String roleid : roleids){ if(!StringUtils.isEmpty(roleid)){ userrole.setCreatetime(new Date()); userrole.setRoleid(roleid); userrole.setUuid(ApplicationUtils.getUUID()); userRoleDao.insertUserRole(userrole); } } } @Override @XmemCache(prefix="userrole" , operation=1) public boolean existsRef(String roleids)throws Exception { String [] roleid = roleids.split(","); List<String> roleidlist = Arrays.asList(roleid); return userRoleDao.existsRef(roleidlist)>0?true:false; } }
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。