校园网站建设方案书深圳做网站公司哪家好
在分布式系统中多个服务需要竞争同一个资源时就需要分布式锁,这里使用zookeeper的临时顺序节点来实现分布式锁。
在节点X下创建临时顺序节点,getChildren()获取节点X的所有子节点,判断当前节点是否是第一个子节点,如果是就获取锁成功了,如果不是,那么就监听当前节点的前一个节点删除watcher事件,在前一节点删除之前当前线程需要阻塞等待,前一节点删除在watcher事件处理通知当前线程获取锁成功。
实现一:
还是使用curator按照上面的逻辑先来自己实现一个简易版的
首先需要抽象出一个DistributeLock类,有两个操作获取锁和释放锁。剩下的就是存储一些加锁节点路径信息等。
锁类定义如下:
public class DistributeLock {private CuratorFramework client;private String ROOT_PATH = "/test_lock";//顺序节点的父节点private String lockpath;//当前创建顺序节点的路径(全路径)private String currPath;//线程等待latchprivate CountDownLatch latch = new CountDownLatch(1);public DistributeLock(CuratorFramework client,String lockpath){this.client = client;this.lockpath = lockpath;}public boolean lock(){try {//创建临时顺序节点currPath = client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(ROOT_PATH+"/"+lockpath);String lockId = currPath.substring(currPath.lastIndexOf("/")+1);List<String> children = client.getChildren().forPath(ROOT_PATH);Collections.sort(children);//当前节点是顺序节点中第一个,获锁成功if(currPath.endsWith(children.get(0))){System.out.println(Thread.currentThread().getName() +"get lock0");return true;}/*** 不是第一个,监听排在自己前面节点的删除事件*///获取当前顺序节点前一节点的索引int preIndex = Collections.binarySearch(children,lockId)-1;//前一节点是否存在Stat stat = client.checkExists().forPath(ROOT_PATH+"/"+children.get(preIndex));System.out.println(ROOT_PATH+"/"+children.get(preIndex));System.out.println(stat==null);if(stat != null){//设置节点监听CuratorCache cache = CuratorCache.build(client,ROOT_PATH+"/"+ children.get(preIndex));cache.listenable().addListener(new CuratorCacheListener() {@Overridepublic void event(Type type, ChildData childData, ChildData childData1) {if(Type.NODE_DELETED.equals(type))latch.countDown();}});cache.start();}else{return true;}//等到前节点删除事件发生latch.await();System.out.println(Thread.currentThread().getName() +"get lock3");} catch (Exception e) {e.printStackTrace();return false;}return false;}public void unlock(){try {//删除当前节点client.delete().forPath(currPath);System.out.println(Thread.currentThread().getName() +"release lock");client.close();} catch (Exception e) {e.printStackTrace();}}
这里使用了CountDownLatch来进行线程阻塞,只是用来实现逻辑,很多细节没有考虑。比如可重入异常控制等。
然后使用这个分布式锁来控制线程执行:
//定义一个竞争资源
private final static AtomicInteger stock = new AtomicInteger(10);
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {executor.submit(new Runnable() {@Overridepublic void run() {CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(1000, 3));client.start();DistributeLock lock = new DistributeLock(client,"stock");lock.lock();int value = stock.decrementAndGet();System.out.println("stock change to:"+value);try {Thread.sleep(new Random().nextInt(2000));} catch (InterruptedException e) {e.printStackTrace();}lock.unlock();}});
}
executor.shutdown();
定义一个竞争资源stock,多个线程对该资源进行操作,一次只允许一个线程进行操作。
实现二:
在curator的recipes包里同样提供了工具类InterProcessMutex用来获取互斥锁。
//初始化锁,需要client连接和锁路径参数
InterProcessMutex mutexLock = new InterProcessMutex(client,"/mutex_lock");
//获取锁 阻塞等待,有重构方法可以设置等待时间
mutexLock.acquire();
//do sth
//释放锁
mutexLock.release();
他这里的锁就是可重入锁。一个线程acquire要对应同等量的release。