티스토리 뷰
DocumentAPI 중에 BulkProcessor 도 존재하는데 이를 사용하여 대량의 문서들을 bulk로 생성해보도록 한다.
우선 이전 포스팅에서와 같이 생성할 문서의 내용을 BulkData 클래스로 생성한다는 전제로 작성한 코드이다.
public Boolean bulkDocumentWithBulkProcessor(ArrayList<BulkData> bulkList) {
return _bulkDocumentWithBulkProcessor(bulkList, 1000);
}
public Boolean bulkDocumentWithBulkProcessor(ArrayList<BulkData> bulkList, int bulkActions) {
return _bulkDocumentWithBulkProcessor(bulkList, bulkActions);
}
private Boolean _bulkDocumentWithBulkProcessor(ArrayList<BulkData> bulkList, int bulkActions) {
boolean terminated = false ;
BulkProcessor bulkProcessor = _getBulkProcessor(bulkActions);
for(BulkData data : bulkList) {
if(data.getActionType().equals(BulkData.Type.CREATE))
bulkProcessor.add(new IndexRequest(data.getIndexName(), data.getTypeName(), data.getId()).source(data.getJsonMap()));
else if(data.getActionType().equals(BulkData.Type.UPDATE))
bulkProcessor.add(new UpdateRequest(data.getIndexName(), data.getTypeName(), data.getId()).doc(data.getJsonMap()));
else if(data.getActionType().equals(BulkData.Type.DELETE))
bulkProcessor.add(new DeleteRequest(data.getIndexName(), data.getTypeName(), data.getId()));
}
try {
terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
log.info("BulkProcessor Finished...." + terminated);
bulkProcessor.close();
} catch (InterruptedException e) {
log.error(e);
}
return terminated;
}
private BulkProcessor _getBulkProcessor(int bulkActions ) {
BulkProcessor bulkProcessor = BulkProcessor.builder(
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),new BulkProcessor.Listener() {
int count = 0;
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
count = count + bulkRequest.numberOfActions();
log.info("Uploaded " + count + " so far");
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
if (bulkResponse.hasFailures()) {
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
log.info("Error " + failure.toString());
}
}
}
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
log.info("Errors " + throwable.toString());
}
})
.setBulkActions(bulkActions).setConcurrentRequests(0)
.setFlushInterval(TimeValue.timeValueSeconds(30L))
.build();
return bulkProcessor;
}
이렇게 작성한 코드는 bulkDocumentWithBulkProcessor 메소드를 호출하여 생성할 문서의 내용이 담긴 bulkList를 넘겨주면 실행이 된다.
이 때 flush할 카운트를 넘겨주면 해당 카운트마다 flush를 하게 되고 디폴트는 1000개마다 flush를 하게 된다.
테스트
@Test
public void bulkProcessor2() throws IOException {
ClientApi api = new ClientApi(ip);
DocumentApi docApi = api.getDocumentApi();
ArrayList<BulkData> bulkList = new ArrayList<BulkData>();
for(int i=1;i<10001;i++) {
BulkData data = new BulkData();
data.setActionType(Type.CREATE);
data.setIndexName("test");
data.setTypeName("_doc");
data.setId(String.valueOf(i));
Map<String, Object> jsonMap = new HashMap<String, Object>();
jsonMap.put("code", String.valueOf(i));
jsonMap.put("title", "제목 "+ String.valueOf(i));
jsonMap.put("date", new Date());
data.setJsonMap(jsonMap);
bulkList.add(data);
}
boolean result = docApi.bulkDocumentWithBulkProcessor(bulkList);
System.out.println(result);
api.close();
}
Database에 있는 내용을 select하여 Elasticsearch에 색인을 생성하도록 하는 예제도 작성해 보자.
select 구문을 통해 생성된 resultset의 내용을 BulkData 에 담아 bulkDocumentWithBulkProcessor() 메소드를 호출하도록 하면 된다.
테스트
@Test
public void bulkProcessor3() throws IOException {
ClientApi api = new ClientApi(ip);
DocumentApi docApi = api.getDocumentApi();
ConnectionDB con = new ConnectionDB();
Connection conn = con.DB("com.mysql.cj.jdbc.Driver", "url", "id", "password");
try {
conn.setAutoCommit(false);
} catch (SQLException e) {
e.printStackTrace();
}
ArrayList<BulkData> bulkList = new ArrayList<BulkData>();
Statement stmt = null;
ResultSet rs = null;
String SQL = "SELECT no, code, title FROM SD_BASE";
try {
stmt = conn.createStatement();
rs = stmt.executeQuery(SQL);
int count = 0;
BulkData data;
Map<String, Object> jsonMap;
while (rs.next()) {
String code = rs.getString("code");
String title = rs.getString("title");
data = new BulkData();
data.setActionType(Type.CREATE);
data.setIndexName("test");
data.setTypeName("_doc");
data.setId(String.valueOf(rs.getInt("no")));
jsonMap = new HashMap<String, Object>();
jsonMap.put("code", code);
jsonMap.put("title", title);
data.setJsonMap(jsonMap);
bulkList.add(data);
count++;
if(count%1000 ==0) {
boolean result = docApi.bulkDocumentWithBulkProcessor(bulkList);
System.out.println(result +" :: " + count);
bulkList.clear();
}
}
boolean result = docApi.bulkDocumentWithBulkProcessor(bulkList);
System.out.println(result +" :: " + count);
} catch (SQLException e) {
e.printStackTrace();
} finally {
try {
rs.close();
stmt.close();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
api.close();
}
전체 코드는
https://github.com/joyhong85/elasticsearch-client
joyhong85/elasticsearch-client
Elasticsearch Java High Level Rest Client. Contribute to joyhong85/elasticsearch-client development by creating an account on GitHub.
github.com
에서 확인할 수 있다.
'Elasticsearch' 카테고리의 다른 글
Java High Level Rest Client 사용하기 - bulk request (0) | 2020.05.10 |
---|---|
Java High Level Rest Client 사용하기 - Document API (0) | 2020.05.09 |
Java High Level Rest Client 사용하기 - Index API (0) | 2020.05.08 |
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
TAG
- 트리플 변환
- Neo4j
- sparql
- 트리플
- 타임리프
- RDF
- neosemantics
- Knowledge Graph
- cypher
- rdfox
- LOD
- Ontology
- 지식 그래프
- TBC
- 스프링부트
- 그래프 데이터베이스
- Linked Data
- 사이퍼
- stardog
- RDF 변환
- property graph
- pyvis
- 온톨로지
- TDB
- networkx
- TopBraid Composer
- 지식그래프
- Thymeleaf
- 장고
- django
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 | 29 |
30 | 31 |
글 보관함