成都汇智知了堂IT培训机构
IT培训课程升级
IT培训机构知了堂联系方式

成都Java培训之Curator框架实现分布式锁

Curator简介

成都Java培训之Curator框架实现分布式锁

目前市面上实现分布式锁还是有很多种方案,zookeeper就是其中一种,但是原生的api特别难用,所以就有curator框架诞生了,它就是对zookeeper原生api的一种封装。 curator提供的是流式编程风格,做的非常不错,是目前使用率最高的一种zookeeper框架。实现起来也是非常的简单。

1、引入 pom依赖

在这里要注意一下,这里pom中使用的zk版本最好与zk连接中使用的版本一致

<!--zookeeper的客户端依赖-->

<dependency>

<groupId>org.apache.zookeeper</groupId>

<artifactId>zookeeper</artifactId>

<version>3.5.9</version>

</dependency>

<!--curator-->

<dependency>

<groupId>org.apache.curator</groupId>

<artifactId>curator-recipes</artifactId>

<version>4.0.1</version>

</dependency>

成都Java培训之Curator框架实现分布式锁

2、具体实现

使用curator实现分布式锁主要有三个步骤:

  • 创建锁对象:new InterProcessMutex(),通过这个类获取的锁对像是可重入的;

  • 获取锁对象:执行acquire()方法获取锁;

  • 释放锁对象:调用release()方法释放锁;

public class TestLock2 {

    // zk临时节点的父目录

    private static final String LOCK_PATH = "/testLock";

    // 客户端的数量

    private static final int CLIENT_NUMS = 3;

    // 线程堵塞计数器

    private static CountDownLatch countDownLatch = new CountDownLatch(CLIENT_NUMS);

    @Test

    public void myTest2() {

        for (int i = 0; i < CLIENT_NUMS; i++) {

            new Thread(new Runnable() {

                @Override

                public void run() {

                    // 重试策略(重试时间,重试次数)

                    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

                    // 获取连接

                    CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);

                    client.start();

                    try {

                        // 可重入锁,可以加入重入操作执行

                        final InterProcessMutex lock = new InterProcessMutex(client, LOCK_PATH);

                        // 可让每个客户端请求多次锁资源进行测试

                        for (int j = 1; j <= 1; j++) {

                            try {

                                // 调用acquire获取锁成功后会在/testLock下创建临时节点,节点名称类似与:_c_723c6b99-f0ec-4539-ada2-3e17a48746ed-lock-0000000309

                                if (lock.acquire(10, TimeUnit.SECONDS)) {

                                    System.out.println(Thread.currentThread().getName() + " get lock 1 ...");

                                    // 模拟业务操作

                                    Thread.sleep(10);

                                    System.out.println(Thread.currentThread().getName() + " do 1 ... ");

                                } else {

                                    throw new IllegalStateException(Thread.currentThread().getName() + " get lock 1 timeout ");

                                }

                                // 获取可重入锁

                                if (lock.acquire(10, TimeUnit.SECONDS)) {

                                    System.out.println(Thread.currentThread().getName() + " get lock 2 ...");

                                    Thread.sleep(10);

                                    System.out.println(Thread.currentThread().getName() + " do 2 ... ");

                                } else {

                                    throw new IllegalStateException(Thread.currentThread().getName() + " get lock 2 timeout");

                                }

                            } finally {

                                // 在finally中释放锁,申请几次释放几次,调用release释放锁删除acquire产生的临时节点

                                System.out.println(Thread.currentThread().getName() + " release lock 2");

                                lock.release();

                                System.out.println(Thread.currentThread().getName() + " release lock 1");

                                lock.release();

                            }

                        }

                    } catch (Exception e) {

                        e.printStackTrace();

                    } finally {

                        CloseableUtils.closeQuietly(client);

                        countDownLatch.countDown();

                    }

                }

            }).start();

        }

        // 阻塞等待线程计数器归零

        try {

            countDownLatch.await();

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        System.out.println("end ...");

    }

// 测试结果

/*

Thread-2 get lock 1 ...

Thread-2 do 1 ...

Thread-2 get lock 2 ...

Thread-2 do 2 ...

Thread-2 release lock 2

Thread-2 release lock 1

2021-01-25 13:36:31,874 [myid:] - INFO  [Curator-Framework-0:CuratorFrameworkImpl@937] - backgroundOperationsLoop exiting

Thread-1 get lock 1 ...

Thread-1 do 1 ...

Thread-1 get lock 2 ...

Thread-1 do 2 ...

Thread-1 release lock 2

Thread-1 release lock 1

2021-01-25 13:36:31,914 [myid:] - INFO  [Curator-Framework-0:CuratorFrameworkImpl@937] - backgroundOperationsLoop exiting

Thread-0 get lock 1 ...

Thread-0 do 1 ...

Thread-0 get lock 2 ...

Thread-0 do 2 ...

Thread-0 release lock 2

Thread-0 release lock 1

2021-01-25 13:36:31,963 [myid:] - INFO  [Curator-Framework-0:CuratorFrameworkImpl@937] - backgroundOperationsLoop exiting

2021-01-25 13:36:32,010 [myid:] - INFO  [Thread-2:ZooKeeper@1422] - Session: 0x1008587bbf70003 closed

2021-01-25 13:36:32,010 [myid:] - INFO  [Thread-2-EventThread:ClientCnxn$EventThread@524] - EventThread shut down for session: 0x1008587bbf70003

2021-01-25 13:36:32,109 [myid:] - INFO  [Thread-0:ZooKeeper@1422] - Session: 0x1008587bbf70004 closed

2021-01-25 13:36:32,109 [myid:] - INFO  [Thread-0-EventThread:ClientCnxn$EventThread@524] - EventThread shut down for session: 0x1008587bbf70004

2021-01-25 13:36:32,209 [myid:] - INFO  [Thread-1:ZooKeeper@1422] - Session: 0x1008587bbf70005 closed

end ...

*/

}


Curator主要从以下几个方面降低了zk使用的复杂性:

成都Java培训之Curator框架实现分布式锁






实战教学·项目驱动

177 1362 3990
预约免费试学
点击咨询
预约试学