티스토리 뷰

Document API 중에는 대량의 문서들을 생성하는 bulk api도 존재한다.

bulk api는 한번의 요청을 보낼 때 여러개의 생성/업데이트/삭제 동작을 수행하도록 할 수 있다.

	@Test
	public void bulk() throws IOException {
		ClientApi api = new ClientApi(ip);
		DocumentApi docApi = api.getDocumentApi();
		Object[] item = (Object[]) docApi.getBulkItem();
		BulkRequest request = (BulkRequest) item[0];
		RestHighLevelClient client = (RestHighLevelClient) item[1];
		
		request.add(new IndexRequest("test", "_doc", "1").source(XContentType.JSON,"code", "100", "title","넘버원","date",new Date()));
		request.add(new UpdateRequest("test", "_doc", "2").doc(XContentType.JSON,"code", "101", "title","넘버투","date",new Date()));
		request.add(new DeleteRequest("test", "_doc", "3"));
		request.add(new IndexRequest("test", "_doc", "4").source(XContentType.JSON,"code", "103", "title","넘버포","date",new Date()));
		
		BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
		for (BulkItemResponse bulkItemResponse : bulkResponse) { 
		    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

		    switch (bulkItemResponse.getOpType()) {
				case INDEX:    
				case CREATE:
				    IndexResponse indexResponse = (IndexResponse) itemResponse;
				    break;
				case UPDATE:   
				    UpdateResponse updateResponse = (UpdateResponse) itemResponse;
				    break;
				case DELETE:   
				    DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
		    }
		    System.out.println(itemResponse);
		}
		api.close();
	}

ClientApi 클래스는 사용 편의를 위해 별도로 생성한 클래스이며 전체 코드는 

https://github.com/joyhong85/elasticsearch-client

 

joyhong85/elasticsearch-client

Elasticsearch Java High Level Rest Client. Contribute to joyhong85/elasticsearch-client development by creating an account on GitHub.

github.com

에서 확인이 가능하다.

 

bulk api에서 가장 큰 특징은 BulkRequest 클래스의 객체에 add() 메소드를 통해 IndexRequest, UpdateRequest, DeleteRequest 객체를 생성하여 넣은 뒤 RestHighLevelClient 객체를 이용하여 bulk() 메소드를 실행하면 된다.

 

이러한 내용을 바탕으로 벌크로 처리할 문서의 내용을 BulkData 라는 클래스로 생성하고

이 내용을 입력받아 처리하도록 변형시킨 코드는 아래와 같다.

	public class BulkData {

	public enum Type{
		CREATE, UPDATE, DELETE
	}
	
	private String indexName;
	private String typeName;
	private String id;
	private Map<String, Object> jsonMap;
	private Type actionType;
	
	public String getIndexName() {
		return indexName;
	}
	public void setIndexName(String indexName) {
		this.indexName = indexName;
	}
	public String getTypeName() {
		return typeName;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getId() {
		return id;
	}
	public void setTypeName(String typeName) {
		this.typeName = typeName;
	}
	public Map<String, Object> getJsonMap() {
		return jsonMap;
	}
	public void setJsonMap(Map<String, Object> jsonMap) {
		this.jsonMap = jsonMap;
	}
	public Type getActionType() {
		return actionType;
	}
	public void setActionType(Type actionType) {
		this.actionType = actionType;
	}
	
}
	/**
	 * ArrayList<BulkData>를 입력받아 bulk request를 수행한다.
	 * @param bulkList
	 */
	public BulkResponse bulkDocument(ArrayList<BulkData> bulkList) {
		
		BulkRequest request = new BulkRequest();
		
		for(BulkData data : bulkList) {
			if(data.getActionType().equals(BulkData.Type.CREATE))
				request.add(new IndexRequest(data.getIndexName(), data.getTypeName(), data.getId()).source(data.getJsonMap()));
			else if(data.getActionType().equals(BulkData.Type.UPDATE))
				request.add(new UpdateRequest(data.getIndexName(), data.getTypeName(), data.getId()).doc(data.getJsonMap()));
			else if(data.getActionType().equals(BulkData.Type.DELETE))
				request.add(new DeleteRequest(data.getIndexName(), data.getTypeName(), data.getId()));
		}
		
		BulkResponse response = null;
		try {
			response = client.bulk(request, RequestOptions.DEFAULT);
		} catch (IOException e) {
			log.error(e);
		}
		for (BulkItemResponse bulkItemResponse : response) { 
			if (bulkItemResponse.isFailed()) { 
		        BulkItemResponse.Failure failure =bulkItemResponse.getFailure(); 
		        log.info(failure.getIndex() +" - " + failure.getType() + " - "+ failure.getId() + " / " + failure.getMessage());
		    }
		}
		log.info(response.getItems());
		log.info(response.getTook());
		
		return response;
	}

 

테스트

	@Test
	public void bulkWithBulkData() throws IOException {
		ClientApi api = new ClientApi(ip);
		DocumentApi docApi = api.getDocumentApi();
		ArrayList<BulkData> bulkList = new ArrayList<BulkData>();
		SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.KOREAN);
		for(int i=1;i<101;i++) {
			BulkData data = new BulkData();
			data.setActionType(Type.CREATE);
			data.setIndexName("test");
			data.setTypeName("_doc");
			data.setId(String.valueOf(i));
			Map<String, Object> jsonMap = new HashMap<String, Object>();
			jsonMap.put("code", String.valueOf(i));
			jsonMap.put("title", "제목 "+ String.valueOf(i));
			jsonMap.put("date", format.format(new Date()));
			data.setJsonMap(jsonMap);
			bulkList.add(data);
		}
		
		BulkResponse response = docApi.bulkDocument(bulkList);
		
		System.out.println(response.status() + " / " + response.getTook());
		api.close();
	}

 

 

최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
«   2024/05   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함