Skip to content

服务器托管,北京服务器托管,服务器租用-价格及机房咨询

Menu
  • 首页
  • 关于我们
  • 新闻资讯
  • 数据中心
  • 服务器托管
  • 服务器租用
  • 机房租用
  • 支持中心
  • 解决方案
  • 联系我们
Menu

JUC同步锁原理源码解析六—-Exchanger

Posted on 2023年9月19日2023年9月19日 by hackdl

JUC同步锁原理源码解析六—-Exchanger

Exchanger

Exchanger的来源

A synchronization point at which threads can pair and swap elements within pairs.  Each thread presents some object on entry to the {@link #exchange exchange} method, matches with a partner thread, and receives its partner's object on return. An Exchanger may be viewed as a bidirectional form of a {@link SynchronousQueue}.

​ JDK中对Exchanger的定义是在一个同步线程点,配对的线程可以交换彼此的属性数据。每一个线程提交对象数据并调用exchange方法,匹配到一个线程并接受该线程携带的数据返回。Exchanger可以被当成一个双向的同步队列。当然Exchanger并不是说只有两个线程进行匹配,它也可以进行多对多的匹配,但是只有成对的线程可以匹配并交换数据成功。

Exchanger的底层实现

​ Exchanger的底层实现依旧依赖于CAS的自旋锁操作,通过cas保证原子性的操作

2.Exchanger

基本使用

public class ExchangerDemo {
    static Exchanger exchanger = new Exchanger();
    public static void main(String[] args) {
        new Thread(()->{
            String s = "T1";
            try {
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " " + s);
        }, "t1").start();


        new Thread(()->{
            String s = "T2";
            try {
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " " + s);

        }, "t2").start();
    }
}

Exchanger类

public class Exchanger {
    /**
     * The byte distance (as a shift value) between any two used slots
     * in the arena.  1 = (MMASK >> 1;

    /**
     * The bound for spins while waiting for a match. The actual
     * number of iterations will on average be about twice this value
     * due to randomization. Note: Spinning is disabled when NCPU==1.
     */
    private static final int SPINS = 1  {
        public Node initialValue() { return new Node(); }
    }

    /**
     * Per-thread state
     */
    private final Participant participant;

    /**
     * Elimination array; null until enabled (within slotExchange).
     * Element accesses use emulation of volatile gets and CAS.
     */
    private volatile Node[] arena;

    /**
     * Slot used until contention detected.
     */
    private volatile Node slot;

    /**
     * The index of the largest valid arena position, OR'ed with SEQ
     * number in high bits, incremented on each update.  The initial
     * update from 0 to SEQ is used to ensure that the arena array is
     * constructed only once.
     */
    private volatile int bound;
}

Node类

@sun.misc.Contended static final class Node {//@sun.misc.Contended 进行缓存行填充,防止数据移植刷新缓存行,造成性能损耗
    int index;              // Arena index
    int bound;              // Last recorded value of Exchanger.bound
    int collides;           // Number of CAS failures at current bound
    int hash;               // Pseudo-random for spins
    Object item;            // This thread's current item
    volatile Object match;  // Item provided by releasing thread
    volatile Thread parked; // Set to this thread when parked, else null
}

Exchanger的构造器

public Exchanger() {
    participant = new Participant();
}

exchange()方法

public V exchange(V x) throws InterruptedException {
    Object v;
    Object item = (x == null) ? NULL_ITEM : x; // 将对象赋值给item
    if ((arena != null ||//表示有多个线程在竞争匹配
         (v = slotExchange(item, false, 0L)) == null) &&//slot匹配的对象返回空,slot表示的一对一的匹配。
        ((Thread.interrupted() || // 线程是否发生中断
          (v = arenaExchange(item, false, 0L)) == null)))//arena表示发生多线程竞争,如果匹配失败
        throw new InterruptedException();//抛出中断异常
    return (v == NULL_ITEM) ? null : (V)v;//返回匹配后并交换的数据
}

exchange(V x, long timeout, TimeUnit unit)方法:

public V exchange(V x, long timeout, TimeUnit unit)
    throws InterruptedException, TimeoutException {
    Object v;
    Object item = (x == null) ? NULL_ITEM : x;// 将对象赋值给item
    long ns = unit.toNanos(timeout);//超时的时间换算成纳秒
    if ((arena != null ||
         (v = slotExchange(item, true, ns)) == null) &&//slot匹配的对象返回空,slot表示的一对一的匹配。
        ((Thread.interrupted() ||// 线程是否发生中断
          (v = arenaExchange(item, true, ns)) == null)))//arena表示发生多线程竞争
        throw new InterruptedException();//抛出中断异常
    if (v == TIMED_OUT)//如果返回对象是超时
        throw new TimeoutException();//抛出超时异常
    return (v == NULL_ITEM) ? null : (V)v;//返回匹配后并交换的数据
}

slotExchange方法

private final Object slotExchange(Object item, boolean timed, long ns) {
    Node p = participant.get();//获取参与者节点
    Thread t = Thread.currentThread();//获取当前线程
    if (t.isInterrupted()) // 判断是否发生中断
        return null;//中断唤醒直接返回

    for (Node q;;) {
        if ((q = slot) != null) {//表示当前属于slot的匹配,也即不是多对多的情况
            if (U.compareAndSwapObject(this, SLOT, q, null)) {//cas将当前slot置空
                Object v = q.item;//获取节点q的item对象
                q.match = item;//将item复制给节点的match,这里表示匹配成功
                Thread w = q.parked;//获取当前线程阻塞在slot中等待的线程
                if (w != null)//如果w线程不为空
                    U.unpark(w);//唤醒w线程
                return v;//返回对象V
            }
            // create arena on contention, but continue until slot null
            //走到这里表示竞争激烈,创建arena数组
            if (NCPU > 1 && bound == 0 &&//如果CPU大于1并且bound为0表示bound前置判断,为0标识没创建arena
                U.compareAndSwapInt(this, BOUND, 0, SEQ))//cas设置BOUND的值为序列号,同时保证多个线程的条件下,只有一个线程可以创建成功
                arena = new Node[(FULL + 2) = (MMASK >> 1; MMASK = 0xff ; ASHIFT = 7  full按照CPU进行取值,如果CPU大于510,直接取MMASK,否则CPU数/2。 ASHIFT的值为7。所以2的7次方为128。也即可以保证缓存行对齐的大小
        }
        else if (arena != null)//走到这里表示竞争激烈,升级成多对多的匹配,所以直接返回调用arenaExchange的匹配
            return null; // caller must reroute to arenaExchange
        else {//这里表示当前线程找不到可以匹配的,所以将自身放到slot中等待后续匹配
            p.item = item;//将item放到p.item中
            if (U.compareAndSwapObject(this, SLOT, null, p))//将SLOT节点置为p节点
                break;//退出
            p.item = null;//cas失败将item置为空,继续循环
        }
    }

    // await release 走到这里的条件是,当前线程将自身cas放入到slot中成功后break退出上一个循环
    int h = p.hash;//获取p节点的hash值
    long end = timed ? System.nanoTime() + ns : 0L;//如果有超时时间,计算超时时间
    int spins = (NCPU > 1) ? SPINS : 1;//SPINS = 1  0) {//进行自旋
            h ^= h >> 3; h ^= h >> 1) - 1)) == 0)//自旋到一定次数,让出CPU执行权。
                Thread.yield();
        }
        else if (slot != p)//如果slot不等p继续自旋,因为竞争强烈,很快就可以匹配成功
            spins = SPINS;
        else if (!t.isInterrupted() && arena == null &&//判断线程是否中断并且arena是否为空
                 (!timed || (ns = end - System.nanoTime()) > 0L)) {//超时判断没有超时
            U.putObject(t, BLOCKER, this);//将BLOCKER置为当前对象
            p.parked = t;//parked为当前线程
            if (slot == p)//slot等于p节点
                U.park(false, ns);//阻塞等待
            p.parked = null;//再次唤醒后,将p.parked的线程置为空
            U.putObject(t, BLOCKER, null);//阻塞对象也置为空
        }
        else if (U.compareAndSwapObject(this, SLOT, p, null)) {//cas将slot的p节点置为空
            v = timed && ns 

arenaExchange方法

private final Object arenaExchange(Object item, boolean timed, long ns) {
    Node[] a = arena;//获取到arena的数组
    Node p = participant.get();//获取到节点p
    for (int i = p.index;;) { //获取index的下标
        int b, m, c; long j;                       // j is raw array offset
        Node q = (Node)U.getObjectVolatile(a, j = (i  0) {//匹配不成功,进行自旋等待
                        h ^= h >> 3; h ^= h >> 1) - 1)) == 0)//h 0L)) {//超时时间是否大于0
                        U.putObject(t, BLOCKER, this); // 设置blocker阻塞对象为当前对象
                        p.parked = t;//parked的阻塞线程为当前线程
                        if (U.getObjectVolatile(a, j) == p)//判断当前arena的下标是否为p
                            U.park(false, ns);//阻塞等待
                        p.parked = null;//唤醒后将parked置为空
                        U.putObject(t, BLOCKER, null);//唤醒后将BLOCKER置为空
                    }
                    else if (U.getObjectVolatile(a, j) == p &&//如果当前arena的j下标处为p
                             U.compareAndSwapObject(a, j, p, null)) {//cas将j下标的p节点置为空
                        if (m != 0) //m表示最大的arena数组不等于0
                            U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);//将数组缩小
                        p.item = null;//将item置为空
                        p.hash = h;//h赋值给p.hash
                        i = p.index >>>= 1;//将index下标右移1位
                        if (Thread.interrupted())//判断是否是中断唤醒
                            return null;//返回空
                        if (timed && m == 0 && ns 

4.留言

​ 到了这里,并发工具包常用的原子性工具类已经结束了,LockSupport由于直接调用底层的park方法,较为复杂,设计到JVM的源码,暂时能力有限,后续如果看懂了在进行更新

服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net

相关推荐: nginx相关知识

目录 一. Nginx目录结构 二. Nginx配置文件结构 三. Nginx具体应用  1. 部署静态资源 2. 反向代理 3. 负载均衡 一. Nginx目录结构 重点目录/文件: conf/nginx.conf    nginx配置文件 html  存放…

Related posts:

  1. 租用新余机柜
  2. 成都服务器托管租用:专业稳定,高效安全。
  3. [oeasy]python0082_颜色设置
  4. 6g服务器托管
  5. 闵行区服务器托管服务详解

服务器托管,北京服务器托管,服务器租用,机房机柜带宽租用

服务器托管

咨询:董先生

电话13051898268 QQ/微信93663045!

上一篇: leetcode 165 比较版本号 compare-version-numbers【ct】
下一篇: chatGPT 学习笔记 学习笔记:chatGPT

最新更新

  • 实时数仓混沌演练实践
  • Matlab快速入门——矩阵的高级学习
  • 用OLED屏幕播放视频(2): 为OLED屏幕开发I2C驱动
  • OverTheWire攻关过程-Natas模块22
  • Swift中的异步编程方式

随机推荐

  • 美国托管服务器多少钱
  • 推荐的xp服务器托管
  • 河北省服务器托管
  • 保密协议书:服务器托管的关键
  • ChatGPT 打字机效果原理

客服咨询

  • 董先生
  • 微信/QQ:93663045
  • 电话:13051898268
  • 邮箱:dongli@hhisp.com
  • 地址:北京市石景山区重聚园甲18号2层

友情链接

  • 服务器托管
  • 机房租用托管
  • 服务器租用托管
©2023 服务器托管,北京服务器托管,服务器租用-价格及机房咨询 京ICP备13047091号-8