您好,登錄后才能下訂單哦!
Memcached 作用與使用 基本介紹
1,對于緩存的存取方式,簡言之,就是以鍵值對的形式將數據保存在內存中。在日常業務中涉及的操作無非就是增刪改查。加入緩存機制后,查詢的時候,對數據進行緩存,增刪改的時候,清除緩存即可。這其中對于緩存的閉合就非常重要,如果緩存沒有及時得到更新,那用戶就會獲取到過期數據,就會產生問題。
2,對于單一業務的緩存管理(數據庫中只操作單表),只需生成一個key,查詢時,使用key,置入緩存;增刪改時,使用key,清除緩存。將key與表綁定,操作相對簡單。
3,但是在現實業務中,更多的是對關聯表的增刪改查(數據庫多表操作),業務之間互相關聯,數據庫中的某張表不止一個業務再進行操作,將緩存攔截在service層,對業務進行緩存,對多表進行緩存。
4,業務層緩存實現策略:
4.1,在緩存中建立一個key為"union_query",value為“hashmap<prefix,uqversion>(‘簡稱uqmap’)“的緩存,prefix保存的是當前業務操作涉及到的數據庫表名的組合(數據庫表名的唯一性),使用‘|’分隔(例 prefix=“A|B”,此次業務將操作A表與B表),uqversion是業務版本號,從0開始遞增。
4.2,調用一個查詢業務時,對數據進行緩存,設置operation為1,告訴cache對象,這是一個緩存操作,例如調用 queryAB(args[])方法時,cache對象切入,將prefix(即”A|B“)與uqversion(初始化為0),存入uqmap中進行緩存。
4.3,將prefix,uqversion,方法明+參數,進行拼接,使用md5進行加密后作為一個key,將方法的結果集作為value,進行緩存。至此緩存成功。
4.4,當第二個請求來調用queryAB時,cache對象切入,首先,查詢uqmap對象,使用prefix找到對應的uqversion,然后,通過拼接加密獲取key,最后取得結果集進行返回。
4.5,當有一個updateA方法被調用時,設置operation為4,告訴cache對象,這是一個刪除緩存的操作,此時prefix的值為“A”,cache對象切入,獲取全局的uqmap,遍歷其中的prefix,是否包含了表A的名稱:如果包含,則更新此prefix的uqversion進行自增,uqversion一旦發生變化,4.3中組合的key將不復存在,業務緩存也就消失了。(對于復雜的updateAB方法,遍歷prefix要復雜一點,可以實現)
4.6,當第三個請求來調用queryAB時,可以獲取到uqversion,組合成key后,但是沒有對應的value。此時確定緩存不存在時,繼續正常執行方法,獲取結果集,返回給客戶的同時,將結果集進行緩存。
5,對于緩存的操作,網上有三種api可以選擇(memcached client forjava、spymemcached、xmemcached),具體的好壞,本人在這就不做分析。本人使用的是XMemcached api。
具體實現細節:
1,新建 @interface Annotation{ } 定義一個注解 @Annotation,一個注解是一個類。定義緩存策略。
import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * 用于查找的時候,放置緩存信息 * @author shufeng */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface XmemCache{ /** * 值為當前操作的表名,表名唯一 * 涉及到多表操作,使用|分隔 */ String prefix() default ""; /* * 緩存有效期 設置,單位為秒 * 指定間隔時間,默認值為3600秒(1小時) * */ int interval() default 3600; /** * 1 從cache里取值,如果未置入cache,則置入 * 2 replace cache value 未擴展 * 3 replace cache value,并返回舊值 未擴展 * 4 remove cache key 從cache里刪除對應的緩存 * 5 remove cache key 從cache里刪除對應的緩存,并返回未刪除之前的值 未擴展 **/ int operation() default 1; }
2,memcache基礎操作類,對一些常用方法進行封裝,對memcachedclient進行配置
import java.io.IOException; import java.io.InputStream; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import com.node.hlhw.rbac.api.constant.Constant; import net.rubyeye.xmemcached.GetsResponse; import net.rubyeye.xmemcached.MemcachedClient; import net.rubyeye.xmemcached.MemcachedClientBuilder; import net.rubyeye.xmemcached.XMemcachedClientBuilder; import net.rubyeye.xmemcached.command.BinaryCommandFactory; import net.rubyeye.xmemcached.exception.MemcachedException; import net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator; import net.rubyeye.xmemcached.transcoders.SerializingTranscoder; import net.rubyeye.xmemcached.utils.AddrUtil; /** * @author Melody shufeng * 對memcachedclient進行封裝,添加一下常用方法 */ public class MemcachedOperate implements DisposableBean { /* * timeout - Operation timeout,if the method is not returned in this * time,throw TimeoutException timeout - operation timeout,in milliseconds * exp - An expiration time, in seconds. Can be up to 30 days. After 30 * days, is treated as a unix timestamp of an exact date. value - stored * data */ private static final Logger logger = LoggerFactory.getLogger(MemcachedOperate.class); private static Properties PROPERTIES = new Properties(); private static String MEMCACHED_SETTING = "memcached.properties"; private static MemcachedClient memcachedClient; public static MemcachedClient getClient(){ return memcachedClient; } /** * 靜態代碼塊,類加載時,初始化緩存客戶端 * 確保只創建一個client實例 * author shufeng */ static { InputStream in = Object.class.getResourceAsStream("/" + MEMCACHED_SETTING); try { PROPERTIES.load(in); } catch (IOException e) { e.printStackTrace(); } String servers = PROPERTIES.getProperty("memcached.servers", ""); if (null != servers && !"".equals(servers)) { try { logger.debug("啟動memcached連接"); MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses(servers)); builder.setConnectionPoolSize(100); builder.setFailureMode(true); builder.setCommandFactory(new BinaryCommandFactory()); builder.setSessionLocator(new KetamaMemcachedSessionLocator()); builder.setTranscoder(new SerializingTranscoder()); memcachedClient = builder.build(); memcachedClient.setEnableHeartBeat(false); // 關閉心跳 memcachedClient.flushAll(); // 清空緩存 } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (MemcachedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } } /** * @param key * @return 獲取value */ public static Object get(String key) { Object object = null; try { object = memcachedClient.get(key); } catch (TimeoutException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (MemcachedException e) { e.printStackTrace(); } return object; } public static void setWithNoReply(String key, int exp, Object value) { try { memcachedClient.setWithNoReply(key, exp, value); } catch (InterruptedException e) { e.printStackTrace(); } catch (MemcachedException e) { e.printStackTrace(); } } /** * 查詢聯表的業務版本號 如果為空,則初始化 * * @param prefix * @return */ @SuppressWarnings("unchecked") public static Long getUnionQueryVersion(String prefix) { try { Map<String, Long> uqmap = null; GetsResponse<Object> getsresponse = memcachedClient.gets(Constant.UNION_QUERY); if (getsresponse == null) { uqmap = new HashMap<String, Long>(); Long uqversion = new Long(1); // 初始化版本號 uqmap.put(prefix, uqversion); if (memcachedClient.cas(Constant.UNION_QUERY, 0, uqmap, 0)) { // 檢測插入之前是否被修改過 return uqversion; // 插入成功 } else { // 插入失敗,說明在代碼運行期間,已經有其他線程去修改了unionquery的緩存,重新進行查詢 return getUnionQueryVersion(prefix); } } else { long cas = getsresponse.getCas(); Object uqobj = getsresponse.getValue(); if (uqobj == null) { // 不存在對象 uqmap = new HashMap<String, Long>(); Long uqversion = new Long(1); // 初始化版本號 uqmap.put(prefix, uqversion); if (memcachedClient.cas(Constant.UNION_QUERY, 0, uqmap, cas)) { // 檢測插入之前是否被修改過 return uqversion; // 插入成功 } else { // 插入失敗,說明在代碼運行期間,已經有其他線程去修改了unionquery的緩存,重新進行查詢 return getUnionQueryVersion(prefix); } } else { uqmap = (Map<String, Long>) uqobj; Long uqversion = uqmap.get(prefix); if (uqversion == null) { // 不存在此業務版本 uqversion = new Long(1); // 初始化版本號 uqmap.put(prefix, uqversion); if (memcachedClient.cas(Constant.UNION_QUERY, 0, uqmap, cas)) { // 檢測插入之前是否被修改過 return uqversion; // 插入成功 } else { // 插入失敗,說明在代碼運行期間,已經有其他線程去修改了unionquery的緩存,重新進行查詢 return getUnionQueryVersion(prefix); } } else { return uqversion; } } } } catch (TimeoutException | InterruptedException | MemcachedException e) { e.printStackTrace(); System.err.println("getUnionQueryVersion---Exception"); } return 1L; } /** * 查詢單表的業務版本號 如果為空,則初始化 * * @return */ public static Long getVersion(String prefix) { try { GetsResponse<Object> getsresponse = memcachedClient.gets(prefix); if (getsresponse == null) { Long pfversion = new Long(1); if (memcachedClient.cas(prefix, 0, pfversion, 0)) { return pfversion; } else { return getVersion(prefix); } } else { Object pfobj = getsresponse.getValue(); long cas = getsresponse.getCas(); if (pfobj == null) { Long pfversion = new Long(1); if (memcachedClient.cas(prefix, 0, pfversion, cas)) { return pfversion; } else { return getVersion(prefix); } } else { return (Long) pfobj; } } } catch (TimeoutException | InterruptedException | MemcachedException e) { e.printStackTrace(); System.err.println("getVersion---Exception"); } return 1L; } /** * shufeng 更新 多表版本號 * 由于存在線程安全問題 ,會覆蓋uqmap,更新unionquery業務版本號 * 使用cas方法解決線程安全問題 * 更新unionquery中key包含p1或p2或p3的version * @param prefix */ @SuppressWarnings("unchecked") public static void updateUnionQueryVersion(String prefix) { try { Map<String, Long> uqmap = null; GetsResponse<Object> getsresponse = memcachedClient.gets(Constant.UNION_QUERY); if (getsresponse == null) { return; } else { Object uqobj = getsresponse.getValue(); long cas = getsresponse.getCas(); if (uqobj == null) { return; } else { uqmap = (HashMap<String, Long>) uqobj; Set<String> uqset = uqmap.keySet(); // 遍歷unionquery中的key Iterator<String> quit = uqset.iterator(); String uqkey = ""; boolean uqflag = false; while (quit.hasNext()) { uqkey = quit.next(); if (("|" + uqkey + "|").contains("|" + prefix + "|")) { // key中包含prefix uqmap.put(uqkey, uqmap.get(uqkey) + 1); // 更新map uqflag = true; } } if (uqflag) { if (!memcachedClient.cas(Constant.UNION_QUERY, 0, uqmap, cas)) { updateUnionQueryVersion(prefix); } } } } } catch (TimeoutException | InterruptedException | MemcachedException e) { e.printStackTrace(); System.err.println("updateUnionQueryVersion---Exception"); } } /** * 更新單表版本號 * * @param prefix * @return */ public static void updateVersion(String prefix) { try { GetsResponse<Object> getsresponse; getsresponse = memcachedClient.gets(prefix); if (getsresponse == null) { return ; } else { Object pfobj = getsresponse.getValue(); long cas = getsresponse.getCas(); if (pfobj == null) { return ; } else { Long pfversion = (Long) pfobj; pfversion += 1; if (!memcachedClient.cas(prefix, 0, pfversion, cas)) { updateVersion(prefix); } } } } catch (TimeoutException | InterruptedException | MemcachedException e) { e.printStackTrace(); System.err.println("updateVersion---Exception"); } } public void shutdown() { try { memcachedClient.shutdown(); } catch (IOException e) { e.printStackTrace(); } } @Override public void destroy() throws Exception { shutdown(); } }
3,結合spring aop 配置緩存,使用spring aop來切入業務層加入緩存,與業務進行解耦。使用注解進行方便配置。
import java.lang.reflect.Method; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.Signature; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSON; import com.node.hlhw.common.cache.XmemCache; import com.node.hlhw.common.digest.Md5Utils; @Component @Aspect public class MemcachedAop { @Pointcut("execution (* com.node.hlhw.*.service.impl.*.*(..))") public void pointcut() { } // 方法執行前調用 @Before("pointcut()") public void before() { } // 方法執行的前后調用 /** * * 改進建議:使用uuid作為版本號,減少版本號的讀取,直接生成uuid,進行緩存 * 線程安全問題:存在線程安全問題,但是針對于緩存,問題不大。 * 多線程同一時間重復覆蓋一個業務id,還是可以更新緩存 * * @param call * @throws Throwable */ @Around("pointcut()") public Object doAround(ProceedingJoinPoint call) throws Throwable { Object result = null; // 檢測是否存在memcached客戶端實例 if (MemcachedOperate.getClient() == null) { System.err.println("memcached client not exist"); result = call.proceed(); return result; } Signature signature = call.getSignature(); MethodSignature methodSignature = (MethodSignature) signature; Method method = methodSignature.getMethod(); if(!method.isAnnotationPresent(XmemCache.class)){ result = call.proceed(); return result; } XmemCache xmemcache = method.getAnnotation(XmemCache.class); // 獲取操作方法 int operation = xmemcache.operation(); // 獲取注解前綴,實際使用是為各個業務包名稱,一般為表名 String prefix = xmemcache.prefix(); // 無前綴 if(prefix==null||"".equals(prefix)){ result = call.proceed(); return result; } // 獲取注解配置memcached死亡時間 秒單位 int interval = xmemcache.interval(); switch (operation) { case 1: // 1 從cache里取值,如果未置入cache,則置入 // 判斷prefix是否涉及多表,查看是否包含| if (prefix.contains("|")) { Long uqversion = MemcachedOperate.getUnionQueryVersion(prefix); String combinedkey = generCombinedKey(prefix, uqversion, method, call.getArgs()); Object resultobj = MemcachedOperate.get(combinedkey); if(resultobj == null){ result = call.proceed(); MemcachedOperate.setWithNoReply(combinedkey, interval, JSON.toJSONString(result));// 緩存數據 }else{ Class<?> returnType = ((MethodSignature) signature).getReturnType(); result = JSON.parseObject(resultobj.toString(), returnType); } } else { // 單表操作 Long pfversion = MemcachedOperate.getVersion(prefix); String combinedkey = generCombinedKey(prefix, pfversion, method, call.getArgs()); Object resultobj = MemcachedOperate.get(combinedkey); if(resultobj == null){ result = call.proceed(); MemcachedOperate.setWithNoReply(combinedkey, interval, JSON.toJSONString(result));// 緩存數據 }else{ Class<?> returnType = ((MethodSignature) signature).getReturnType(); result = JSON.parseObject(resultobj.toString(), returnType); } } break; case 2: // 2 replace cache value break; case 3: break; case 4: // 4 remove cache key 從cache里刪除對應 業務版本的緩存 /* * 更新unionquery業務版本號 * 0,切割 prefix為p1、p2、p3 * 1,更新prefix為p1或p2或p3的version * 2,更新unionquery中key包含p1或p2或p3的version */ if (prefix.contains("|")) { // 表示涉及到多表,需要清除 單表的緩存,與聯表中 包含 當前部分的 緩存 String[] prefixs = prefix.split("\\|"); // 0.切割 prefix為p1、p2、p3 for(String pf : prefixs){ MemcachedOperate.updateVersion(pf); // 1,更新prefix為p1或p2或p3的version MemcachedOperate.updateUnionQueryVersion(pf); } }else{ // 沒有涉及到多表的時候 MemcachedOperate.updateVersion(prefix); MemcachedOperate.updateUnionQueryVersion(prefix); } result = call.proceed(); break; default: result = call.proceed(); break; } return result; } /** * 組裝key值 * @param key * @param version * @param method * @param args * @return */ private String generCombinedKey(String key, Long version, Method method, Object[] args) { StringBuffer sb = new StringBuffer(); // 獲取方法名 String methodName = method.getName(); // 獲取參數類型 Object[] classTemps = method.getParameterTypes(); // 存入方法名 sb.append(methodName); for (int i = 0; i < args.length; i++) { sb.append(classTemps[i] + "&"); if (null == args[i]) { sb.append("null"); } else if ("".equals(args[i])) { sb.append("*"); } else { String tt = JSON.toJSONString(args[i]); sb.append(tt); } } sb.append(key); sb.append(version.toString()); String temp = Md5Utils.getMD5(sb.toString()); return temp; } }
4,properties文件中配置memcached服務器地址
#host1:port1,host2:port2 memcached.servers=192.168.1.1:11211,192.168.1.2:11211
5,修改spring配置文件,聲明自動為spring容器中那些配置@aspectJ切面的bean創建代理,織入切面。
<aop:aspectj-autoproxy proxy-target-class="true"/>
6,service層使用注解方式切入緩存
import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Map; import org.apache.ibatis.session.RowBounds; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import com.alibaba.dubbo.common.utils.StringUtils; import com.alibaba.dubbo.config.annotation.Service; import com.node.hlhw.common.cache.XmemCache; import com.node.hlhw.common.digest.ApplicationUtils; import com.node.hlhw.core.service.BaseService; import com.node.hlhw.core.store.IBaseStore; import com.node.hlhw.core.store.PageParam; import com.node.hlhw.rbac.api.dao.UserRoleDao; import com.node.hlhw.rbac.api.entity.UserRole; import com.node.hlhw.rbac.api.service.UserRoleService; /** * @author Melody * 處理用戶角色 */ @Service(version = "1.0.0") public class UserRoleServiceImpl extends BaseService<UserRole> implements UserRoleService { private static final Logger logger = Logger .getLogger(UserRoleServiceImpl.class); @Autowired public UserRoleDao userRoleDao; @Override protected IBaseStore<UserRole> getBaseDao() { return userRoleDao; } /* * 單表操作,prefix為表名,operation為4,只進行緩存的刪除操作 */ @XmemCache(prefix="userrole",operation=4) public void insertUserRole(UserRole userRole) throws Exception { userRoleDao.insertUserRole(userRole); logger.info("插入用戶角色數據"); } /* (non-Javadoc) * 此方法操作了兩個表,role與userole,使用‘|’進行分隔 * operation為1,表示緩存操作,對結果集進行緩存 * interval表示緩存時間默認不填為3600秒,也可指定具體時長 */ @Override @XmemCache(prefix="role|userrole",interval=3600 , operation=1) public List<Map<String, Object>> selectUserRoleList(UserRole userrole, PageParam pageParam) throws Exception { RowBounds rowBounds = new RowBounds(pageParam.getOffset(),pageParam.getLimit()); List<Map<String, Object>> list = userRoleDao.selectUserRoleList(userrole,rowBounds); return list ; } @Override @XmemCache(prefix="userrole" , operation=4) public void modifyUserRole(UserRole userrole, String[] roleids)throws Exception { //刪除所包含的角色 userRoleDao.deleteByUserRole(userrole); for(String roleid : roleids){ if(!StringUtils.isEmpty(roleid)){ userrole.setCreatetime(new Date()); userrole.setRoleid(roleid); userrole.setUuid(ApplicationUtils.getUUID()); userRoleDao.insertUserRole(userrole); } } } @Override @XmemCache(prefix="userrole" , operation=1) public boolean existsRef(String roleids)throws Exception { String [] roleid = roleids.split(","); List<String> roleidlist = Arrays.asList(roleid); return userRoleDao.existsRef(roleidlist)>0?true:false; } }
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。