0%

现象

producer发送消息超时

1
2
3
4
5
6
7
8
9
10
11
2021-03-05 10:06:47.083 [pool-5-thread-3] ERROR com.alibaba.otter.canal.kafka.CanalKafkaProducer - java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for boss-clearing.clear_task_execute_record-0: 30094 ms has passed since batch creation plus linger time
java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for boss-clearing.clear_task_execute_record-0: 30094 ms has passed since batch creation plus linger time
at com.alibaba.otter.canal.kafka.CanalKafkaProducer.produce(CanalKafkaProducer.java:215) ~[canal.server-1.1.4.jar:na]
at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:179) ~[canal.server-1.1.4.jar:na]
at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:117) ~[canal.server-1.1.4.jar:na]
at com.alibaba.otter.canal.server.CanalMQStarter.worker(CanalMQStarter.java:183) [canal.server-1.1.4.jar:na]
at com.alibaba.otter.canal.server.CanalMQStarter.access$500(CanalMQStarter.java:23) [canal.server-1.1.4.jar:na]
at com.alibaba.otter.canal.server.CanalMQStarter$CanalMQRunnable.run(CanalMQStarter.java:225) [canal.server-1.1.4.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]

问题分析

  1. 网络连接问题,导致超时
  2. kafka故障

通过查看kafka服务状态,确认是kafka问题
同时,服务状态显示,400+ partition offline

到这里,可以肯定kafka集群出了故障。
但是,按理说,kafka是分布式服务。topic分partition,partition又有replica。单个broker故障,数据完整性不会受影响。replica的数据完全能顶上。


服务状态查看

从网页看,发现under replicated partition,即分区在复制中。

下面,我们具体看看topic分区情况
./kafka-topics.sh --describe --zookeeper xx-xx-01:2181 --under-replicated-partitions

示意:ISR即 in-sync replica
假设,有数据Topic: __consumer_offsets Partition: 6 Leader: 146 Replicas: 222,146,225 Isr: 146,222
数据一共3份,但是同步中的只有2份。可知broker 225故障。

此处,我通过重启225的kafka服务恢复。

为什么一个备份故障,服务就不可用

kafka topic有参数acks,可选值0,1,2….all
意思是,几个副本保存成功,认为ack成功

当broker故障,topic isr数量变少。可能存在一种情况: 同步中的副本数 < 提交成功需要的副本数

GG


参考资料:
Apche Kafka 的生与死 – failover 机制详解

此评分卡非彼评分卡

以逻辑回归跑出逾期概率

概率值转评分

参考文献:

java应用程序启动时,我们常常会加上 -Xmx 选项,以确保进程不会吃掉我们限制的范围。

然后你可能收到这样的惊喜。当打开命令行工具,查看 top 信息。会发现应用吃掉的内存[resident memory usage(RES)]超过了我们的预设。

为什么进程消耗的内存超过了我们分配?是bug还是完全正常的现象?

首先,一部分原因可能是代码造成的内存泄漏。但是,99%的情况下,都是jvm的正常行为。因为 -Xmx 仅仅是限制了应用程序的堆大小。

除了堆,这里还有其他几个内存空间被应用程序使用,比如 permgenstack 。为了限制它们,我们需要额外指定几个参数 -XX:MaxPermSize-Xss

简而言之,我们可以通过如下公式,预测我们应用的内存占用:

1
Max memory = [-Xmx] + [-XX:MaxPermSize] + number_of_threads * [-Xss]

但是,除应用外。jvm本身也要消耗内存:

  • 垃圾收集
    • java是一门自动垃圾回收语言,自动回收程序跟踪对象,执行回收需要占用内存
    • G1更是出了名的倾向于过多占用内存
  • 即时编译(JIT)
    • 虚拟机为了提升代码执行效率
    • 需要跟踪代码执行
  • 堆外内存
    • 使用直接或映射的 ByteBuffers 或者三方工具。在无意中拓展了堆空间,这些空间不被java虚拟机控制
  • 本地接口(JNI)
    • 使用本地代码,就要占用本地内存
  • 元空间(Metaspace)
    • java8中新增的空间,取代永久代保存类定义信息。

起因

cloudera集群邮件告警,zookeeper server离线。

第一次处理,无脑重启集群。因为白天没什么任务,直接重启可以解决部分资源紧张问题。

第一次处理结果,失败。

原因复查

重启不能解决,那么就是真的有问题了。

首先,我们检查了集群角色日志,发现out of memory错误。

显然应用内存爆了……


第二次处理,在Cloudera manager中修改zookeeper配置,添加内存到2G。

第二次处理结果,失败。

内存从1G调整到2G,直接double,没道理再崩啊 0-0

dump分析

没办法,拉出zk服务的oom时的堆转储文件

可以看到,起初的dump文件800+M;后来的dump文件1.6+G

神奇,挂的很完美


在序列化当前事务时出错。


通过java visual vm查看,数组扩容时出错。跟进去看


输出流写出大量数据,导致数组扩容。申请不到足够内存。

那么问题来了,zk上,通过只会存储极少量注册信息。怎么会有这么多数据。

zk数据检查

zk服务已经宕机,我们无法通过寻常方式检查zk节点信息。

如此,我们想到zk数据存储在哪里呢?

答案是:datadir指定的目录,配置在zoo.cfg中


我们观察到,数据目录下,同时有zk日志文件、快照文件。

可以看到,快照文件大小,无比接近dump文件。进一步确认数据量爆炸。

快照文件查看方式如下,

java -cp /opt/cloudera/parcels/CDH-6.0.0-1.cdh6.0.0.p0.537114/lib/zookeeper/lib/log4j.jar:/opt/cloudera/parcels/CDH-6.0.0-1.cdh6.0.0.p0.537114/lib/zookeeper/build/*:/opt/cloudera/parcels/CDH-6.0.0-1.cdh6.0.0.p0.537114/lib/zookeeper/build/lib/*:/opt/cloudera/parcels/CDH-6.0.0-1.cdh6.0.0.p0.537114/lib/zookeeper/*:/opt/cloudera/parcels/CDH-6.0.0-1.cdh6.0.0.p0.537114/lib/zookeeper/lib/*:/opt/cloudera/cm/lib/plugins/event-publish-6.0.0-shaded.jar:/opt/cloudera/cm/lib/plugins/tt-instrumentation-6.0.0.jar org.apache.zookeeper.server.SnapshotFormatter snapshot.3000540f8 | more

配合grep/awk/sort等,我们发现,集群上有近10000个数据节点。

其中有部分节点比较大,达到0.3M。

进一步观察,较大的这些文件,都是hive share lock。


集群中一直有hive在运行,但是从没发现hive使用了如此多的zk资源。属实意外!!!

hive事务实现

穿插一点hive:hive事务由共享锁、排它锁实现。在我锁使用的版本中,锁被保存的zk。


询问可能使用hive的同事,事发时间确实有跑脚本。


那么问题基本定位,gps位置信息在hive中分区存储,拥有大量分区表。

同事的sql使用了这些分区表,所以zk中写入了大量hive共享锁信息(估计近4000)

zk重启

调大zk内存到5G,重启

可以看到follower从leader同步快照文件。由于文件较大,同步慢慢不结束。


这里方法有点暴力,生产不建议操作

强制重新初始化zk数据目录zookeeper-server-init,重启zk

最后的处理

关闭hive事务(当前业务上刚好没用到该特性)

重启所有服务

精简篇幅,本篇暂不讨论为java stream服务的特性

1. Interfaces

  • Map<K,V>
  • Cloneable
  • Serializable

2. Abstract super class

  • AbstractMap<K,V>

3. Static fields

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* The default initial capacity - MUST be a power of two.
*/
// 初始化容量, 默认16
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16

/**
* The maximum capacity, used if a higher value is implicitly specified
* by either of the constructors with arguments.
* MUST be a power of two <= 1<<30.
*/
// 最大容量,默认最大int
static final int MAXIMUM_CAPACITY = 1 << 30;

/**
* The load factor used when none specified in constructor.
*/
// 负载因子,触发resize的阀值,默认0.75
static final float DEFAULT_LOAD_FACTOR = 0.75f;

/**
* The bin count threshold for using a tree rather than list for a
* bin. Bins are converted to trees when adding an element to a
* bin with at least this many nodes. The value must be greater
* than 2 and should be at least 8 to mesh with assumptions in
* tree removal about conversion back to plain bins upon
* shrinkage.
*/
// 链表转红黑树的阀值(防止hash碰撞过于严重)
static final int TREEIFY_THRESHOLD = 8;

/**
* The bin count threshold for untreeifying a (split) bin during a
* resize operation. Should be less than TREEIFY_THRESHOLD, and at
* most 6 to mesh with shrinkage detection under removal.
*/
// 红黑树转链表的阀值
static final int UNTREEIFY_THRESHOLD = 6;

/**
* The smallest table capacity for which bins may be treeified.
* (Otherwise the table is resized if too many nodes in a bin.)
* Should be at least 4 * TREEIFY_THRESHOLD to avoid conflicts
* between resizing and treeification thresholds.
*/
// 转红黑树的前提阀值(先扩容table,再扩容bin)
static final int MIN_TREEIFY_CAPACITY = 64;

4. class fields

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/**
* The table, initialized on first use, and resized as
* necessary. When allocated, length is always a power of two.
* (We also tolerate length zero in some operations to allow
* bootstrapping mechanics that are currently not needed.)
*/
// hashMap数据存储的地方
transient Node<K,V>[] table;

/**
* Holds cached entrySet(). Note that AbstractMap fields are used
* for keySet() and values().
*/
// hashMap数据访问对象(视图/缓存实例),依赖HashIterator实现entry访问
transient Set<Map.Entry<K,V>> entrySet;

/**
* The number of key-value mappings contained in this map.
*/
// map元素数量
transient int size;

/**
* The number of times this HashMap has been structurally modified
* Structural modifications are those that change the number of mappings in
* the HashMap or otherwise modify its internal structure (e.g.,
* rehash). This field is used to make iterators on Collection-views of
* the HashMap fail-fast. (See ConcurrentModificationException).
*/
// map修改次数,包括put\remove
// 在迭代map数据前后,做双重检查。并发访问,抛出运行时异常
transient int modCount;

/**
* The next size value at which to resize (capacity * load factor).
*
* @serial
*/
// (The javadoc description is true upon serialization.
// Additionally, if the table array has not been allocated, this
// field holds the initial array capacity, or zero signifying
// DEFAULT_INITIAL_CAPACITY.)
// 下次扩容阀值
int threshold;

/**
* The load factor for the hash table.
*
* @serial
*/
// 扩容因子
final float loadFactor;

/**
* Each of these fields are initialized to contain an instance of the
* appropriate view the first time this view is requested. The views are
* stateless, so there's no reason to create more than one of each.
*
* <p>Since there is no synchronization performed while accessing these fields,
* it is expected that java.util.Map view classes using these fields have
* no non-final fields (or any fields at all except for outer-this). Adhering
* to this rule would make the races on these fields benign.
*
* <p>It is also imperative that implementations read the field only once,
* as in:
*
* <pre> {@code
* public Set<K> keySet() {
* Set<K> ks = keySet; // single racy read
* if (ks == null) {
* ks = new KeySet();
* keySet = ks;
* }
* return ks;
* }
*}</pre>
*/
// 类似entrySet的key访问对象
transient Set<K> keySet;
// 类似entrySet的value访问对象
transient Collection<V> values;

5. Inner classes

5.1简单存储对象

simpleEntry:键值对存储对象。

simpleImmutableEntry:不支持修改的键值对存储对象。

共性:都重写了equal方法,比较key & value

5.2 hash与红黑树

e g:

  • hash散列作用
  • hash算法选择
  • hash算法不可控因素
  • 数组与红黑树转换条件、原因
  • 红黑树机制简单介绍
5.2.1 什么是hash算法

又称散列算法,是一种从任意文件中创造小的数字「指纹」的方法。

与指纹一样,散列算法就是一种以较短的信息来保证文件唯一性的标志,这种标志与文件的每一个字节都相关,而且难以找到逆向规律。因此,当原有文件发生改变时,其标志值也会发生改变,从而告诉文件使用者当前的文件已经不是你所需求的文件。

特点:

  • 正向快速
  • 逆向困难
  • 输入敏感
  • 冲突避免:很难找到两段内容不同的明文,使得它们的 hash 值一致(发生冲突)。

常见实现方式:取模/异或/位运算

下面给出在Java中几个常用的哈希码(hashCode)的算法。

  1. Object类的hashCode. 返回对象的经过处理后的内存地址,由于每个对象的内存地址都不一样,所以哈希码也不一样。这个是native方法,取决于JVM的内部设计,一般是某种C地址的偏移。
  2. String类的hashCode. 根据String类包含的字符串的内容,根据一种特殊算法返回哈希码,只要字符串的内容相同,返回的哈希码也相同。
  3. Integer等包装类,返回的哈希码就是Integer对象里所包含的那个整数的数值,例如Integer i1=new Integer(100), i1.hashCode的值就是100 。由此可见,2个一样大小的Integer对象,返回的哈希码也一样。
  4. int,char这样的基础类,它们不需要hashCode,如果需要存储时,将进行自动装箱操作,计算方法同上。
5.2.2 散列

return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);

取hashcode参与运算,因为hashcode返回值为int(32位)

所以,将hashcode右移16位,再做异或运算,混合了高低位信息。

5.2.3 从hash到index

i = (tab.length - 1) & hash;

与运算 – 取一个数中指定位

index取值范围[0, tab.lenth -1]

与运算后,即为对应index。

从上文 [从hash到index](#####5.2.3 从hash到index)可以发现,在寻找map元素位置过程中,使用了与运算取hash结果中指定位。结合map容量字段定义,可以发现,容量为2的指数倍,才能刚好取hash结果中指定位。另一个可得结果是,map的数组大小,不会超过2的16次方。

5.2.4 hash碰撞

散列算法分布均为,也无法控制输入的数据分布。所以碰撞时必然的,甚至分布十分集中。

数组中存储 Node 对象。

当该index中对象碰撞较少时,以链表存储。新node放置在最前面。

当该index中对象碰撞较多时,链表转换为红黑树TreeNode。红黑树高度为log n,所以其查找、插入、删除操作复杂度均为O(log N)。

5.2.5 红黑树介绍(省略……哈哈)

5.3 并行处理API(忽略)

5.4 map数据访问对象

  • entrySet
  • keySet
  • valueSet

隐藏了底层复杂的数据结构(数组、链表、红黑树)。简化数据遍历。

实例化的内部类,持有hashmap的字段信息。同时自身也会被作为hashmap中字段存储,可视为map的视图。

5.4.1 map数据遍历与删除对象

set对象内部封装了迭代器,允许在迭代器中遍历、删除对象。

iterator 保存了当前Node 和下一个Node 。如果此时其他线程修改了map结构(增加删除key),那么下一个Node可能就不再存在,或者变成别的。

fast-fail

hashmap是不允许并发操作的(get除外)

  • 并发put/remove
    • hash碰撞时,链表新元素插入头部。先put的可能消失
    • hash碰撞时,红黑树同上(根结点会被替换)
  • 并发(put/remove) 和 iterator
    • 每次key发生变化,modCount就会+1。iterator每次next后,都会检查next操作后modCount与iterator保存的expectedModCount是否一致。一旦不一致,就会抛出ConcurrentModificationException

Ps: fast-fail可以作为一个代码bug的提示。hashmap普通的修改操作不支持并发,可能会丢数据(扩容问题cover later)。iterator操作时,完全不允许key发生变更,故一旦发现操作数发生变化,抛出异常。

6. 扩容过程&并发扩容问题

死循环

参考文献

什么是LRU

​ LRU: Least Recent Used。意即:最近最少使用。写代码好几年,我一直这么以为的。

​ 重读wiki,我觉着当初这个名字,起的真不怎么样。具体实现中,实际上要淘汰的是,当前最久未被访问数据。如果一定要换个名字,我觉着 “最近最久未被访问” 策略更合适。

###基本实现

统计最长未被访问,一般有两种方式。

  1. 记录上次访问时间(淘汰掉,访问时间最早的)
  2. 记录上次访问后,访问了多少次其他缓存(淘汰掉未被访问次数最多的)

​ wiki基准实现描述的上述第二种方式。以node对象为载体,记录。

1
2
3
4
5
6
7
8
9
10
11
public class Node {
private String key;
private Object value;
private int unAccessCount;

public Object get(String key){
// 省略
unAccessCount = 0
// 其他node unAccessCount++
}
}

​ 当缓存空间占满,需要淘汰内存页时,选择unAccessCount最大的节点,丢弃之。

​ 对应操作系统虚拟内存来说,如果node是具体内存页,那么我们还需要一个内存表来辅助查询。例如,添加一个map,处理key与node的引用。加速key的检索过程。

缺陷:

  • 每次访问一个随机key A,其他所有key需要加 1

LRU变体

1. 变体 - 基准实现的优化

​ 使用一个固定长度的双向链表存储node。A端为最新访问数据,B端为最久未访问数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class LruCache {
// 当前最新插入或访问的缓存
private Node first;
// 需要时,被过期的缓存
private Node last;
// 缓存容量
private int capitalSize;
// 存储缓存数据(双向链表) -- 增删快
private Node nodes;
// 提供缓存命中查询 -- 查询快
private ConcurrentHashMap<String, Node> cache;

public value get(String key){
Object value = cache.get(key);
if (value != null) {
removeNode(key);
addFirstNode(key, value);
}
return value;
}

public void put(String key, Object value){
Object oldValue = cache.get(key);
if (oldValue != null){
removeNode(key);
addFirstNode(key, value);
}

if (cache.size() >= capitalSize){
removeLastNode();
}
}

public vlid addFirstNode(String key, Object value);

public void removeNode(String key);

public void removeLastNode();
}

优点:

  • 相较基础版每次修改accessCount,操作数明显降低

缺点:

  • 与基础版相同,最近访问一次,就会被当作热点数据(LRU均存在该问题

2. 变体 - redis方案

​ redis自带过期机制,所以,其LRU实现是基于时间的。

早期方案:

​ 缓存容量满时,redis基于 server.maxmemory_samples 随机抽取指定数量的key,过期其中访问时间最早的。

优点:

  • 过期操作快

缺点:

  • 过期key随机性较大,刚存入的数据,也可能被抽中

改进方案:

​ 创建一个指定容量的等待过期队列。

​ 第一步,随机抽取一个key,置入队列;第二步,随机抽取指定个数key,将访问时间小于池中最小访问时间的key置入队列。

​ 缓存容量不足时,过期队列中访问时间最早的key。

优点:

  • 异步维护待过期队列
  • 过期操作快
redis为什么要自定义过期策略
  • redis key数量不固定。如果key数量过多,选择全局最久未访问key耗时高
  • redis为单线程应用,不允许过期策略过度消耗
  • redis本身存在过期策略。配置容器满适用LRU时,需要与LRU策略兼容

LFU

​ Least-frequently used:最小访问频次,频次最低的会被过期。与LRU相似。

​ 记录访问次数,存活时间,可得。

PLRU(CPU缓存过期策略)

Pseudo-LRU:伪 - LRU。适用于cpu这样的多级缓存系统。

​ CPU缓存系统,每一级维护LRU策略成本过高;所以,CPU设计者们,采用了近似的LRU算法。例如:L1层级区块L1a,对应L2层级L2a、L2b两个区块,则针对L1a每次过期L2a、L2b两个区块中的一个(而不是针对整个L2层过期最近最久未访问)。

原文地址:http://mechanitis.blogspot.com/2011/07/dissecting-disruptor-writing-to-ring.html(因被墙移到墙内)

作者:Trisha

​ 这是disruptor端到端视图中缺失的部分。撑住,这相当长!

要点:

  • 无重叠环
  • 通知消费者
  • 批生产
  • 多生产这协作

生产者屏障

disruptor 源码中有消费者的接口和帮助类,但是没有生产者接口。因为你只需要了解生产者,而不需要额外的访问。像消费端一样, ring buffer 创建了一个生产者屏障,生产者使用 producer barrier 写入数据到 ring buffer

​ 写入 ring buffer 涉及到两阶段提交。首先,生产者必须申明 ring buffer 中下一个插槽的所有权;之后,当生产者完成对该插槽的写入,它将提交事物到 producer barrier

​ 我们先看第一步。“给我 ring buffer 的下一个插槽”,这听起来很简单。确实,从生产者角度来看这确实简单。只需要调用 producer barriernextEntry() 方法,就会给你返回下一个插槽的实体对象。

生产者屏障保证ring buffer不重叠

​ 表层之下,producer barrier 确认了下一个插槽是啥,并且你是否有权限写入。

​ 在上图中,我们假设只有一个生产者写入 ring buffer 。随后,我们会来处理复杂的多生产者模式。

ConsumerTrackingProducerBarrier 持有一系列访问 ring buffer 的消费者。对于我来说,现在看起来有点奇怪。我不想在 producer barrier 关心消费端。不过,这是有原因的。我们不希望队列存在归并关系,我们的消费者有责任知道自己的关心的序列号。所以,如果我们想确认我们没有重叠缓存,我们必须检查消费者的消费位置。

​ 上图中,一个消费者恰巧位于最高序列号12;另一个消费者滞后一些,位于序列号3,可能在执行IO等等。因此,消费者2在赶上消费者1之前,需要消费整个缓冲区长度的消息。

​ 生产者想向 ring buffer 中当前被序列号3占据位置写入数据,因为这个插槽刚好在 ring buffer 当前指针之后。当时 producer barrier 知道它当前并不能写入,因为有一个消费者正在使用它。然后 producer barrier 只能停下来,自旋,等待该消费者让出插槽。

申明下一个插槽

​ 现在,我们想象一下,消费者2结束那批消息的消费。也许它消费到序列号9的位置(在现实生活中,因为消费者批量消费,往往消费到序列号12的位置。不过,那样的话样例就变得没意思了)

​ 上面这张图演示了消费者2更新到序列号9的情形。因为 consumer barrier 在这里没有作用,我省略它。

producer barrier 看到下一个插槽变得可用。它窃取了插槽中的实体对象(我还没有专门讨论过这个实体类。简要的说,它基本就是一个筐,存放任何你想放进 ring buffer 的,携带序列号的东西)。设置实体序列号为下一个值(13),然后返回实体到生产者。生产者就能写入任何它想写入的东西。

提交新值

​ 两阶段提交的第二阶段,即是提交。

​ 绿色插槽代表我们刚更新的序列号13的实体。

​ 当生产者将消息写入实体,它会通知 producer barrier 去提交。

producer barrier 等待 ring buffer 指针捕获我们的位置(在单生产者时,这是毫无意义的。示例:我们知道指针在序列号12位置,没有什么会再写入该序列号。)producer barrier 更新 ring buffer 指针到序列号13。下一步,producer barrier 告诉消费者新消息已经准备好。这依赖于唤醒 consumer barrierwait strategy “醒醒,有事做了”(根据阻塞与否,不同等待策略实现处理方式不同)。

​ 现在消费者1可以消费实体13,消费者2可以 消费13及之前实体。他们愉快的工作着。

生产者屏障批处理(并发写入)

​ 有趣的是,disruptor 可以处理多个生产者,就像消费者端一样。回想一下,消费者2什么时候执行程序,并发现自己在位置9的?这里 producer barrier 做了一件狡猾的事 - 它知道缓冲区的长度,知道消费最慢的消费者位置。所以,它能知道哪个插槽可用。

​ 假使 producer barrier 知道 ring buffer 的指针在位置12,最慢的消费者在位置9。它就能让生产者写入插槽3,4,5,6,7,8。

多生产者

​ 你是否觉着我已经考虑完了?实际上这里还有更多。

​ 上面图中我略有撒谎。我暗示 producer barrier 使用的序列号,直接来源于 ring buffer 的指针。然而,如果查看代码,你会发现它使用 claim strategy 来获取序列号。我跳过这里以简化图形,因为它在单生产者中并不重要。

​ 在多生产者中,你需要另一个组件来跟踪可以写入的序列号。注意,这不是 ring buffer 指针➕1那么简单 - 如果你不仅有一个生产者,有些实体可能正在北写入,且没有被提交。

​ 让我们重温申明一个插槽。每一个生产者询问 claim strategy 下一个可用插槽。就像上面单生产者一样,生产者1拿到了序列号13。即使 ring buffer 指针任然指向12,生产者2会拿到序列号14。因为 claim sequence 跟踪已经被分配的序列号。

​ 所以,每个生产者拿到它独占的插槽和崭新的序列号。

​ 我把生产者1和它的插槽标记为绿色;生产者2和它的插槽标记为可疑的粉色。

​ 现在,想象一下生产者1脱离了掌控,处于某种原因未曾提交。生产者2已经准备好提交,询问 producer barrier 是否可以提交。

​ 之前提交图中所示,producer barrier 只有当ring buffer 的指针到达想要提交的插槽后方时,它才会提交。在这个案例中,指针抵达13,我们才能提交14。因为生产者1盯着某些闪光的东西没有提交,我们不能提交14。所以 claim strategy 等在那里直到 ring buffer 指针到达指定位置。

​ 现在生产者1从昏迷中醒来,提交实例13。producer barrier 告诉 claim stratey ring buffer 指针要在12位置。然后 ring buffer 指针增长到13,producer barrier 通知 wait strategy 让消费者们知道 ring buffer 已经更新。此时,producer barrier 才能结束生产者2的请求,ring buffer 指针增长到14,通知所有消费者我们已经更新。

​ 你可以发现,就算生产者结束写入的时间不同,ring buffer 始终保持了初始调用nextEntry() 的顺序。这意味着一旦某个生产者写入时暂停,当它接触锁定后,其他后续提交可以立即执行。

Ps:

  1. 最新版本中, ring buffer 隐藏了 producer barrier 。如果你找不到,就当它存在于 ring buffer
  2. 2.0版本中,有写类名发生了改变。如果你感到困惑,请参考my summary of changes

引言

当代计算机存储体系,基本构成磁盘、内存、cpu高速缓存、cpu寄存器。

以cpu为例:

​ 现代cpu都是多核cpu,cpu一般有3级缓存。其中二级缓存是专用的,多核处理器中,每个内核独享L2 cache;三级缓存则是高层级的缓存,它被所有内核共享。

什么是伪共享

​ 伪共享主要发生在L2 cache。内核独享L2,为了保证不同内核L2数据的一致性,缓存机制会强制刷新已更改的L2。

​ 一个数据被修改就会触发L2同步,这很直观,看起来也毫无破绽。不过,考虑到内存块设计,事情就不会这么简单了。内存通常被划分为2的4、8、16等次方KB 或者 MB大小(小批量的读写,可以兼顾吞吐量和实时性)。

​ 一旦触发L2同步,那么整个内存块上的数据都会被强制重载。那些即使不曾发生变化的数据,也会被重载。

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct foo {
int x;
int y;
};

static struct foo f;

/* The two following functions are running concurrently: */

int sum_a(void)
{
int s = 0;
for (int i = 0; i < 1000000; ++i)
s += f.x;
return s;
}

void inc_b(void)
{
for (int i = 0; i < 1000000; ++i)
++f.y;
}

(不同内核,同时执行上面方法)

这里x,y会出现在L2同一内存块,x值一直不变,但是由于y发生改变。sum_a必须不断的从主内存加载x。

ps:

java代码中,一般只关注对并发修改字段的处理(加锁)。就算写到这里,我也还没想好怎么应用到自己的代码中去。if you can do, plealse mail to me. Ths!

ps2:

神奇的解决方法 — 内存块补全。x放入内存后,塞点无用数据,是内存块不能再加载y。

参考:

guide of kudu

impala上操作kudu

  1. 建表

    1. ```sql
      CREATE TABLE dc_stg.kudu_crm_dm_dealer_logs_new (

      id  bigint COMMENT '主键id',
      dealer_id  bigint  COMMENT '门店ID',
      account_id  bigint COMMENT '账户ID',
      cuid  bigint COMMENT '创建者',
      opt_info  string COMMENT '信息',
      created  string  COMMENT '创建时间',
      status  int  COMMENT '状态',
      remark  string COMMENT '审核备注/操作记录',
      username  string COMMENT '创建者姓名',
      

      PRIMARY KEY ( id )
      ) PARTITION BY HASH PARTITIONS 16 COMMENT ‘门店操作日志’
      STORED AS KUDU;

      1
      2
      3
      4
      5
      6
      7

      2. 特征`STORED AS KUDU`

      2. 删表

      1. ```sql
      drop TABLE dc_stg.kudu_crm_dm_dealer_logs_new;
    2. 此时,impala中关于kudu表的映射关系已删除;但是,kudu表依旧存在

    3. ```shell
      kudu table delete bigdata01:7051,bigdata04:7051 impala::dc_stg.kudu_crm_dm_dealer_logs_new

      1
      2
      3
      4
      5
      6
      7

      4. ⚠️表名前impala::不可缺失

      3. 数据刷新

      1. ```sql
      refresh dc_stg.kudu_crm_dm_dealer_logs_new;

Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.

Quick Start

Create a new post

1
$ hexo new "My New Post"

More info: Writing

Run server

1
$ hexo server

More info: Server

Generate static files

1
$ hexo generate

More info: Generating

Deploy to remote sites

1
$ hexo deploy

More info: Deployment