04、ES实战:更新索引-updateupset与Bulk的批量更新

Java更新索引(update & upset)
update

更新使用UpdateRequest(update类型更新,只能更新)

public class EsUpdate{
   
     
    public void updateIndex(TransportClient client){
   
     
        Date time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                                .parse("2016-7-21 00:00:01");

        UpdateRequest updateRequest = new UpdateRequest();
        updateRequest.index("pointdata")
				     .type("pointdata")
				     .id("1")
				     .doc(XContentFactory.jsonBuilder()
							             .startObject()
							             .field("pointid","W3.UNIT1.10LBG01CP302")
						                 .field("pointvalue","0.8029")
										 .field("inputtime",
                    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time))
							             .endObject()
                );
		//执行修改
       UpdateResponse response1 = client.update(updateRequest).get();

		//查询修改状态,返回ok表示成功
		System.out.println(response1.status());
    }
}

upset

要用IndexRequest设定添加文档,UpdateRequest设定更新文档,设定upset执行有则修改无则更新(upset类型更新,文档不存在时创建)

public class EsUpSet{
   
     
    public void updateIndex(TransportClient client){
   
     
         Date time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                                .parse("2016-7-21 00:00:01");
		//添加文档
        IndexRequest request1 = new IndexRequest("pointdata", "pointData", "1")
						        .source(
								     XContentFactory.jsonBuilder()
							             .startObject()
							             .field("pointid","W3.UNIT1.10LBG01CP302")
						                 .field("pointvalue","0.8029")
										 .field("inputtime",
                    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time))
							             .endObject()
						        );
		
		//修改文档		        
        UpdateRequest updateRequest2 = new UpdateRequest();
        updateRequest.index("pointdata")
				     .type("pointdata")
				     .id("1")
				     .doc(XContentFactory.jsonBuilder()
							             .startObject()
							             .field("pointid","W3.UNIT1.10LBG01CP302")
						                 .field("pointvalue","0.8029")
										 .field("inputtime",
                    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time))
							             .endObject()
                ).upset(request1);
       
		UpdateResponce responce = client.update(request2).get();
		
        //查询修改状态,返回ok表示成功
	    System.out.println(response2.status());
    }
}

基于Bulk的批量更新(update & upset)

动态的更新一个 documents 中的任意 field(字段),包括原来没有的 field (字段)

public ResultMsg updateIndex(final String index,
			final JSONArray resultList) {
   
     
		final ResultMsg resultMsg = new ResultMsg();
		BulkRequestBuilder bulkRequest = client.prepareBulk();
		bulkRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);

		int count = 0;
		int len = resultList.size();
		try {
   
     
			for (int i = 0; i < len; i++) {
   
     
				final XContentBuilder builder = XContentFactory.jsonBuilder()
						.startObject();
				
				JSONObject json = resultList.getJSONObject(i);
				final String id = json.getString("id");
				json.remove("id");
				
				for (String key : json.keySet()) {
   
     
					builder.field(key, json.get(key));
				}

				builder.endObject();

				// update更新
				UpdateRequestBuilder requestBuilder = client.prepareUpdate(
						index, index, id).setDoc(builder);
				
				// upset更新
				UpdateRequestBuilder requestBuilder = client.prepareUpdate(
						index, index, id).setDoc(builder).setUpsert();
				// IndexRequestBuilder requestBuilder = client.prepareIndex(
				// "sampling", indexName, id).setSource(builder);
				bulkRequest.add(requestBuilder);
				count++;

				if (count % BULK_SIZE == 0) {
   
     
					BulkResponse bulkResponse = bulkRequest.execute()
							.actionGet();
					if (bulkResponse.hasFailures()) {
   
     
						LOGGER.error("批量更新索引数据失败: " + indexName);
						LOGGER.error("批量更新索引数据失败: "
								+ bulkResponse.buildFailureMessage());

						resultMsg.setHasFailures(true);

						BulkItemResponse[] responses = bulkResponse.getItems();
						for (int k = 0; k < responses.length; k++) {
   
     
							BulkItemResponse response = responses[k];
							if (response.isFailed()) {
   
     
								ErrorDetail errorDetail = new ErrorDetail();
								errorDetail.setIndex(k + (i / BULK_SIZE));
								errorDetail.setId(response.getId());
								errorDetail
										.setMsg(response.getFailureMessage());

								resultMsg.addError(errorDetail);
							}
						}
					}

					bulkRequest = client.prepareBulk();
					bulkRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
					count = 0;
				}
			}

			if (count > 0) {
   
     
				BulkResponse bulkResponse = bulkRequest.execute().actionGet();
				if (bulkResponse.hasFailures()) {
   
     
					LOGGER.error("批量更新索引数据失败: " + indexName);
					LOGGER.error("批量更新索引数据失败: "
							+ bulkResponse.buildFailureMessage());

					resultMsg.setHasFailures(true);

					BulkItemResponse[] responses = bulkResponse.getItems();
					for (int k = 0; k < responses.length; k++) {
   
     
						BulkItemResponse response = responses[k];
						if (response.isFailed()) {
   
     
							ErrorDetail errorDetail = new ErrorDetail();
							errorDetail.setIndex(k + (len / BULK_SIZE));
							errorDetail.setId(response.getId());
							errorDetail.setMsg(response.getFailureMessage());

							resultMsg.addError(errorDetail);
						}
					}
				}
			}

			return resultMsg;
		} catch (Exception e) {
   
     
			LOGGER.error("批量更新索引数据失败: " + indexName);
			throw new RuntimeException("批量更新索引数据失败: " + indexName, e);
		} finally {
   
     
			bulkRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
		}

	}

测试:

	@Test
	public void updateIndex() {
   
     
		List<JSONObject> jsonList = new ArrayList<>();
		JSONObject json = new JSONObject();
		json.put("id", "1");
		json.put("tag", 1);
		jsonList.add(json);
		
		updateIndex("index", jsonList);
	}

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: