redis分布式锁可以解决多个应用进程间同步操作的一致性。网上有很多资料并不能完全解决。
- 1.时间同步问题
- 2.在一个进程crash后失效时间后自动释放锁
- 3.有些多线程race condition没有考虑到
Java版本的代码参考如下
package com.wdzj.jedis.distribution.test;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.exceptions.JedisException;
/**
* Jedis实现分布式锁
*
* @author hello
*
*/
public class JedisDistributionLock {
private final static long ACCQUIRE_LOCK_TIMEOUT_IN_MS = 10 * 1000;
private final static int EXPIRE_IN_SECOND = 5;//锁失效时间
private final static long WAIT_INTERVAL_IN_MS = 100;
private final JedisPool jedisPool;
private final long acquireLocktimeoutInMS;
private final int expireInSecond;
private final long waitIntervalInMS;
private final ConcurrentMap<String, String> settedKeys;
public JedisDistributionLock(final JedisPool jedisPool,
final long acquireLocktimeout, final int expireInSecond,
final long waitIntervalInMS) {
this.jedisPool = jedisPool;
this.acquireLocktimeoutInMS = acquireLocktimeout;
this.expireInSecond = expireInSecond;
this.waitIntervalInMS = waitIntervalInMS;
this.settedKeys = new ConcurrentHashMap<String, String>();
}
public JedisDistributionLock(final JedisPool jedisPool) {
this(jedisPool, ACCQUIRE_LOCK_TIMEOUT_IN_MS, EXPIRE_IN_SECOND,
WAIT_INTERVAL_IN_MS);
}
public void lock(final String redisKey) throws Exception {
validateRedisKeyName(redisKey);
Jedis resource = null;
try {
resource = jedisPool.getResource();
long timeoutAt = currentTimeMillisFromRedis()
+ acquireLocktimeoutInMS;
boolean flag = false;
while (true) {
String expireAt = String.valueOf(currentTimeMillisFromRedis()
+ expireInSecond * 1000);
long ret = resource.setnx(redisKey, expireAt);
if (ret == 1) {
settedKeys.put(redisKey, expireAt);
flag = true;
break;
} else {
String oldExpireAt = resource.get(redisKey);
if (oldExpireAt != null
&& Long.parseLong(oldExpireAt) < currentTimeMillisFromRedis()) {
oldExpireAt = resource.getSet(redisKey, expireAt);
if (Long.parseLong(oldExpireAt) < currentTimeMillisFromRedis()) {
settedKeys.put(redisKey, expireAt);
flag = true;
break;
} else {
// loop ...
}
} else {
// loop ...
}
}
if (acquireLocktimeoutInMS <= 0
|| timeoutAt < currentTimeMillisFromRedis()) {
break;
}
try {
TimeUnit.MILLISECONDS.sleep(waitIntervalInMS);
} catch (Exception ignore) {
}
}
if (!flag) {
throw new RuntimeException("canot acquire lock now ...");
}
} catch (JedisException je) {
je.printStackTrace();
if (resource != null) {
jedisPool.returnBrokenResource(resource);
}
throw je;
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
if (resource != null) {
jedisPool.returnResource(resource);
}
}
}
public boolean unlock(final String name) throws Exception {
validateRedisKeyName(name);
Jedis resource = null;
try {
resource = jedisPool.getResource();
resource.del(name);
settedKeys.remove(name);
return true;
} catch (JedisException je) {
je.printStackTrace();
if (resource != null) {
jedisPool.returnBrokenResource(resource);
}
return false;
} catch (Exception e) {
e.printStackTrace();
return false;
} finally {
if (resource != null) {
jedisPool.returnResource(resource);
}
}
}
public boolean unlockAll() throws Exception {
Jedis resource = null;
try {
resource = jedisPool.getResource();
Iterator<String> iter = settedKeys.keySet().iterator();
while (iter.hasNext()) {
String key = iter.next();
resource.del(key);
settedKeys.remove(key);
}
return true;
} catch (JedisException je) {
je.printStackTrace();
if (resource != null) {
jedisPool.returnBrokenResource(resource);
}
return false;
} catch (Exception e) {
e.printStackTrace();
return false;
} finally {
if (resource != null) {
jedisPool.returnResource(resource);
}
}
}
private void validateRedisKeyName(String name) {
if (name == null || "".equals(name.trim())) {
throw new IllegalArgumentException("validateKey fail.");
}
}
private Long currentTimeMillisFromRedis() throws Exception {
Jedis resource = null;
try {
resource = jedisPool.getResource();
return Long.parseLong(resource.time().get(0)) * 1000;
} catch (JedisException je) {
je.printStackTrace();
if (resource != null) {
jedisPool.returnBrokenResource(resource);
}
throw je;
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
if (resource != null) {
jedisPool.returnResource(resource);
}
}
}
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxIdle(200);
config.setMaxTotal(200);
JedisPool pool = new JedisPool(config, "192.168.8.21", 6379, 3000,
"########");
final JedisDistributionLock jedisDistributionLock = new JedisDistributionLock(
pool);
final int threadNum = 8;
final CountDownLatch countDownLatch = new CountDownLatch(threadNum);
final int reqsPerThread = 10;
final AtomicLong seq = new AtomicLong(0);
final String key = "qq";
for (int i = 0; i < threadNum; i++) {
executorService.submit(new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getId()
+ " start...");
try {
for (int j = 0; j < reqsPerThread; j++) {
jedisDistributionLock.lock(key);
System.out.println(seq.incrementAndGet());
jedisDistributionLock.unlock(key);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}
});
}
countDownLatch.await();
System.out.println("---------------------------->" + seq.longValue());
if (threadNum * reqsPerThread == seq.longValue()) {
System.out.println("-------------ok-----------");
} else {
System.err.println("-------------err-----------");
}
}
}
参考资料
http://blog.csdn.net/alsocoderalsogeek/article/details/50888468
http://www.cnblogs.com/wuhuajun/p/5242644.html