您好,登錄后才能下訂單哦!
這篇文章主要講解了“elasticsearch引擎怎么啟動”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“elasticsearch引擎怎么啟動”吧!
Engine是ES最接近神Lucene的地方,是對Lucene分布式環境訪問的一層封裝。這個類的接口,是個命令模式,所以很自然的實現了操作日志 translog。引擎舊版本的實現類叫RobinEngine,新版本改名了,而且加了幾個版本類型。不過這對我們分析源碼影響不大。它主要的2個內容是 操作日志 和 數據版本。亮點是鎖的運用。我們這里不羅列代碼了,就從 封裝 和 并發 2個角度看下代碼。
既然是引擎,開始討論之前,先啟動了它再說。ES是用guice管理的實例。我們為了直觀一點,就直接New了。
public Engine createEngine() throws IOException { Index index=new Index("index"); ShardId shardId = new ShardId(index, 1); ThreadPool threadPool = new ThreadPool(); CodecService cs = new CodecService(shardId.index()); AnalysisService as = new AnalysisService(shardId.index()); SimilarityService ss = new SimilarityService(shardId.index()); Translog translog = new FsTranslog(shardId, EMPTY_SETTINGS, new File("c:/fs-translog")); DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS); Store store = new Store(shardId, EMPTY_SETTINGS, null, directoryService, new LeastUsedDistributor(directoryService)); SnapshotDeletionPolicy sdp = new SnapshotDeletionPolicy( new KeepOnlyLastDeletionPolicy(shardId, EMPTY_SETTINGS)); MergeSchedulerProvider<?> scp = new SerialMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool); MergePolicyProvider<?> mpp = new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(index, EMPTY_SETTINGS)); IndexSettingsService iss = new IndexSettingsService(shardId.index(), EMPTY_SETTINGS); ShardIndexingService sis = new ShardIndexingService(shardId, EMPTY_SETTINGS,new ShardSlowLogIndexingService(shardId,EMPTY_SETTINGS, iss)); Engine engine = new RobinEngine(shardId,EMPTY_SETTINGS,threadPool,iss,sis,null,store, sdp, translog,mpp, scp,as, ss,cs); return engine; }
封裝后,其實就是用JSON語法查詢,返回JSON內容。這個在引擎這個層面,還沒實現。回想下Lucene的CURD。我們再看看ES引擎的CURD是什么樣的。首先對Document封裝了下,叫ParsedDocument
private ParsedDocument createParsedDocument(String uid, String id, String type, String routing, long timestamp, long ttl,Analyzer analyzer, BytesReference source, boolean mappingsModified) { Field uidField = new Field("_uid", uid, UidFieldMapper.Defaults.FIELD_TYPE); Field versionField = new NumericDocValuesField("_version", 0); Document document=new Document(); document.add(uidField); document.add(versionField); document.add(new Field("_source",source.toBytes())); document.add(new TextField("name", "myname", Field.Store.NO)); return new ParsedDocument(uidField, versionField, id, type, routing, timestamp, ttl, Arrays.asList(document), analyzer, source, mappingsModified); }
Engine engine = createEngine(); engine.start(); String json="{\"name\":\"myname\"}"; BytesReference source = new BytesArray(json.getBytes()); ParsedDocument doc = createParsedDocument("2", "myid", "mytype", null, -1, -1, Lucene.STANDARD_ANALYZER, source, false); //增加 Engine.Create create = new Engine.Create(null, newUid("2"), doc); engine.create(create); create.version(2); create.versionType(VersionType.EXTERNAL); engine.create(create); //刪除 Engine.Delete delete = new Engine.Delete("mytype", "myid", newUid("2")); engine.delete(delete); //修改類似增加,省略了 //查詢 TopDocs result=null; try { Query query=new MatchAllDocsQuery(); result=engine.searcher().searcher().search(query,10); System.out.println(result.totalHits); } catch (Exception e) { e.printStackTrace(); } //獲取 Engine.GetResult gr = engine.get(new Engine.Get(true, newUid("1"))); System.out.println(gr.source().source.toUtf8());
從用法上來看這個小家伙,也沒那么厲害啊。沒關系,到后來他會越來越厲害,直到最后成長成高大上的ES。
query的流程我們是清楚的,那Lucene沒有的這個Get操作是什么樣的邏輯呢? 我們看下
最理想的情況下,直接從translog里,返回數據。否則會根據UID在TermsEnum里定位這個詞條(二分查找?),然后根據指針從fdt文件里,返回內容。
就說如果把query分,query和fatch 2個階段的話,他是前面的階段不一樣的。比起termQuery少了一些步驟。
//我簡化了代碼,演示代碼 for(AtomicReaderContext context : reader.leaves()){ try { Terms terms=context.reader().terms("brand"); TermsEnum te= terms.iterator(null); BytesRef br=new BytesRef("陌".getBytes()); if(te.seekExact(br,false)){ DocsEnum docs = te.docs(null, null); for (int d = docs.nextDoc(); d != DocsEnum.NO_MORE_DOCS; d = docs.nextDoc()) { System.out.println(reader.document(d).getBinaryValue("_source").utf8ToString()); } } } catch (IOException e) { e.printStackTrace(); } }
refresh調用的就是Lucene的 searcherManager.maybeRefresh() ,Flush的話分,3種情況
static class Flush { public static enum Type { /** * 創建一個新的Writer */ NEW_WRITER, /** * 提交writer */ COMMIT, /** * 提交translog. */ COMMIT_TRANSLOG }
refresh的話更輕量一點。他默認會自動刷新
@Override public TimeValue defaultRefreshInterval() { return new TimeValue(1, TimeUnit.SECONDS); }
它每一個操作的邏輯都差不多,我們選一個創建看下。引擎是整個ES用鎖用的最頻繁的地方,一層套一層的用啊,不出事,也是怪事一件。
@Override public void create(Create create) throws EngineException { rwl.readLock().lock(); try { IndexWriter writer = this.indexWriter; if (writer == null) { throw new EngineClosedException(shardId, failedEngine); } innerCreate(create, writer); dirty = true; possibleMergeNeeded = true; flushNeeded = true; } catch (IOException e) { throw new CreateFailedEngineException(shardId, create, e); } catch (OutOfMemoryError e) { failEngine(e); throw new CreateFailedEngineException(shardId, create, e); } catch (IllegalStateException e) { if (e.getMessage().contains("OutOfMemoryError")) { failEngine(e); } throw new CreateFailedEngineException(shardId, create, e); } finally { rwl.readLock().unlock(); } } private void innerCreate(Create create, IndexWriter writer) throws IOException { synchronized (dirtyLock(create.uid())) { //....省略下數據版本的驗證,不在這里講 if (create.docs().size() > 1) { writer.addDocuments(create.docs(), create.analyzer()); } else { writer.addDocument(create.docs().get(0), create.analyzer()); } Translog.Location translogLocation = translog.add(new Translog.Create(create)); //versionMap.put(versionKey, new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation)); indexingService.postCreateUnderLock(create); } }
首先是個讀寫鎖。很多操作都加讀鎖了,寫鎖只有啟動關閉引擎,recover-phase3 ,NEW_WRITER類型的flush的時候才加的。意思就是。。。在我啟動關閉引擎,數據恢復和重新創建indexWriter的時候,是不能干任何事情的。接下來是個對象鎖。UID加鎖,用的鎖分段技術,就是ConcurrentHashMap的原理。減少了大量鎖對象的創建。要知道UID可是個海量對象啊。這里要是用個String,分分鐘就OO吧。
private final Object[] dirtyLocks; this.dirtyLocks = new Object[indexConcurrency * 50]; // 默認最多會有8*50個鎖對象... for (int i = 0; i < dirtyLocks.length; i++) { dirtyLocks[i] = new Object(); } private Object dirtyLock(BytesRef uid) { int hash = DjbHashFunction.DJB_HASH(uid.bytes, uid.offset, uid.length); // abs returns Integer.MIN_VALUE, so we need to protect against it... if (hash == Integer.MIN_VALUE) { hash = 0; } return dirtyLocks[Math.abs(hash) % dirtyLocks.length]; }
感謝各位的閱讀,以上就是“elasticsearch引擎怎么啟動”的內容了,經過本文的學習后,相信大家對elasticsearch引擎怎么啟動這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。