
1.前言
一般来说,实现分布式锁的方式有哪几种?
一:Redisson实现
二:ZK实现
其它方式好像没有了,真的是这样么?
2.实现
LockUtils
package xxxx.utils;import lombok.extern.slf4j.Slf4j;import org.elasticsearch.action.delete.DeleteRequest;import org.elasticsearch.action.delete.DeleteResponse;import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.action.index.IndexResponse;import org.elasticsearch.action.search.SearchRequest;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.client.RequestOptions;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.client.indices.CreateIndexRequest;import org.elasticsearch.client.indices.CreateIndexResponse;import org.elasticsearch.client.indices.GetIndexRequest;import org.elasticsearch.common.xcontent.XContentType;import org.elasticsearch.index.query.QueryBuilders;import org.elasticsearch.rest.RestStatus;import org.elasticsearch.search.builder.SearchSourceBuilder;import java.io.IOException;/*** 基于es写的轻量级分布式锁,可避免引入redis/zk等其它依赖**/4jpublic class LockUtils {/*** id字段名*/private final static String ID_FIELD = "_id";/*** 重试等待时间*/private final static Integer WAIT_SECONDS = 1;/*** 锁的index的名称*/private final static String LOCK_INDEX = "ee-distribute-lock";private final static Integer ZERO = 0;private final static Integer ONE = 1;private final static String DISTRIBUTED_LOCK_TIP_JSON = "{\"tip\":\"Do not delete unless deadlock occurs\"}";/*** 尝试获取es分布式锁** @param client RestHighLevelClient* @param idValue 相当于key* @param maxRetry 最大重试次数* @return 是否获取成功*/public static synchronized boolean tryLock(RestHighLevelClient client, String idValue, Integer maxRetry) {boolean existsIndex = existsIndex(client, LOCK_INDEX);if (!existsIndex) {createEmptyIndex(client, LOCK_INDEX);}if (maxRetry <= ZERO) {return Boolean.FALSE;}if (getCount(client, idValue) > ZERO) {try {Thread.sleep(WAIT_SECONDS / maxRetry);} catch (InterruptedException e) {e.printStackTrace();}return tryLock(client, idValue, --maxRetry);} else {return createLock(client, idValue);}}/*** 创建锁** @param client RestHighLevelClient* @param idValue 相当于key* @return 是否创建成功*/private static boolean createLock(RestHighLevelClient client, String idValue) {IndexRequest indexRequest = new IndexRequest(LOCK_INDEX);indexRequest.id(idValue);indexRequest.source(DISTRIBUTED_LOCK_TIP_JSON, XContentType.JSON);IndexResponse response;try {response = client.index(indexRequest, RequestOptions.DEFAULT);} catch (IOException e) {e.printStackTrace();return Boolean.FALSE;}return response.status().equals(RestStatus.CREATED);}/*** 释放锁** @param client RestHighLevelClient* @param idValue 相当于key* @param maxRetry 最大重试次数* @return 是否释放成功*/public synchronized static boolean release(RestHighLevelClient client, String idValue, Integer maxRetry) {DeleteRequest deleteRequest = new DeleteRequest(LOCK_INDEX);deleteRequest.id(idValue);if (maxRetry <= ZERO) {return Boolean.FALSE;}DeleteResponse response;try {response = client.delete(deleteRequest, RequestOptions.DEFAULT);} catch (IOException e) {return retryRelease(client, idValue, --maxRetry);}if (RestStatus.OK.equals(response.status())) {return Boolean.TRUE;} else {return retryRelease(client, idValue, maxRetry);}}/*** 重试释放** @param client RestHighLevelClient* @param idValue id字段值实际未entityClass名,一个entity对应一把锁* @param maxRetry 最大重试次数* @return 是否重试成功*/private static boolean retryRelease(RestHighLevelClient client, String idValue, Integer maxRetry) {try {Thread.sleep(WAIT_SECONDS / maxRetry);} catch (InterruptedException interruptedException) {interruptedException.printStackTrace();}return release(client, idValue, --maxRetry);}/*** 获取个数** @param client RestHighLevelClient* @param idValue 相当于key* @return 该id对应的锁的个数, 如果>0 说明已有锁,需重试获取,否则认为无锁*/private static Integer getCount(RestHighLevelClient client, String idValue) {SearchRequest searchRequest = new SearchRequest();searchRequest.indices(LOCK_INDEX);SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.query(QueryBuilders.termQuery(ID_FIELD, idValue));searchRequest.source(searchSourceBuilder);SearchResponse response;try {response = client.search(searchRequest, RequestOptions.DEFAULT);} catch (IOException e) {e.printStackTrace();return ONE;}return (int) response.getHits().getTotalHits().value;}/*** 创建空索引,不含字段** @param client RestHighLevelClient* @param indexName 索引名* @return 是否创建成功*/public static boolean createEmptyIndex(RestHighLevelClient client, String indexName) {CreateIndexRequest request = new CreateIndexRequest(indexName);CreateIndexResponse createIndexResponse;try {createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);} catch (IOException e) {log.info("===> distribute lock index has created");return Boolean.TRUE;}return createIndexResponse.isAcknowledged();}/*** 是否存在索引** @param client RestHighLevelClient* @param indexName 索引名* @return 是否存在*/public static boolean existsIndex(RestHighLevelClient client, String indexName) {GetIndexRequest request = new GetIndexRequest(indexName);try {return client.indices().exists(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException("existsIndex exception indexName:" + indexName + "ex:" + e.getMessage());}}}
easyEs官网
https://www.easy-es.cn/pages/7ead0d/#%E7%AE%80%E4%BB%8B
可以使用我之前写的es的启动器,下面的文章里面有分享,看一参看
ES启动器实现及应用间fegin调用fastJson数据解析时间类型转换bug修复
https://mp.weixin.qq.com/s/E7ZckUVvC-v2nUV7AXwEaQ