curl命令示例

示例:

1
curl -v  -H Content-Type:application/json -H appKey:2DHYBU6 -X POST --data '[{ "indexCode": "00001"}]' http://10.41.10.21/start
  • -v 展示详细信息
  • -H 添加header
  • -X POST 指定为Post请求
  • –data HTTP POST方式传送数据

更多用法可以参考阮一峰的curl用法指南

windows修改命令行字符编码

1
chcp 65001

然后右键属性,字体修改为
【Lucida Console】

Spring @Transactional学习

项目中出现几个关于Spring事务的异常,排查之后发现都是因为@Transactional应用不当导致的。

源码学习

在此分享学习@Transactional的过程,先看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//可标记方法和类
@Target({ElementType.METHOD, ElementType.TYPE})
//运行时生效
@Retention(RetentionPolicy.RUNTIME)
//自动继承
@Inherited
@Documented
public @interface Transactional {

//指定TransactionManager
@AliasFor("transactionManager")
String value() default "";

//指定TransactionManager
@AliasFor("value")
String transactionManager() default "";
//指定事务传播设置
Propagation propagation() default Propagation.REQUIRED;

//指定事务隔离级别
Isolation isolation() default Isolation.DEFAULT;

//指定超时时间
int timeout() default TransactionDefinition.TIMEOUT_DEFAULT;

//标记只读事务
boolean readOnly() default false;

//指定回滚的异常类型
Class<? extends Throwable>[] rollbackFor() default {};

String[] rollbackForClassName() default {};

//指定不会滚的异常类型
Class<? extends Throwable>[] noRollbackFor() default {};

String[] noRollbackForClassName() default {};

}

除了Propagation其他设置都比较简单,其中Isolation总共有5种,Default就是采用数据库默认的事务隔离级别,其他四种分别指定对应数据库的隔离级别,感兴趣的话可以看下《数据库事务》

而Propagation可以看做使用事务的策略,下面来看下Propagation的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public enum Propagation {
//REQUIRED必须有事务,也就是说当前线程有事务就用,没有就New一个
REQUIRED(TransactionDefinition.PROPAGATION_REQUIRED),

//支持事务,即有事务就用,没有就不用
SUPPORTS(TransactionDefinition.PROPAGATION_SUPPORTS),

//强制事务,必须有,没有就抛异常
MANDATORY(TransactionDefinition.PROPAGATION_MANDATORY),

//必须new事务
REQUIRES_NEW(TransactionDefinition.PROPAGATION_REQUIRES_NEW),

//非事务操作,暂停当前事务,只对分布式JtaTransactionManager有效
NOT_SUPPORTED(TransactionDefinition.PROPAGATION_NOT_SUPPORTED),

//不能在事务内执行,存在事务则抛异常
NEVER(TransactionDefinition.PROPAGATION_NEVER),
//嵌套事务(JDBC3.0 DataSourceTransactionManager下有效)
NESTED(TransactionDefinition.PROPAGATION_NESTED);

//省略
}

场景: 循环内部调用Transactional方法时,可以设置为Propagation.REQUIRES_NEW 并在循环体内部捕捉异常,防止单个操作失败,导致所有操作都被回滚。

注意事项

  1. 只对public方法生效
  2. 自调用无效:被内部其他方法调用时,不生效
    据说AspectJ可以避免这两种情况,有待研究

Spring定时任务BLOCK问题排查

问题现象

设备状态不正确,查日志发现间定时轮询设备状态的任务 已经快一天没打日志了。
另外:

  1. 日志没有相关错误信息
  2. 其它定时任务正常执行

解决过程

在StackOVerFlow上搜到一个类似的问题,是由定时任务中的HttpClient阻塞引起的。
相似度很高,都是在Scheduled Task中发起远程调用。
参考回答,先用jstack -F pid 拿到线程快照,查了一下,发现该任务的堆栈Block在ArrayBlockingQueue.take()方法上,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class HkpClientHandler extends SimpleChannelInboundHandler<MessagePackage> {
private BlockingQueue<Object> response = new ArrayBlockingQueue<Object>(1);

@Override
protected void channelRead0(ChannelHandlerContext ctx, MessagePackage msg) throws Exception {
response.add(msg.getXmlMessage());
ctx.close();
}
public String getResponse() throws InterruptedException {
Object result = response.take();
if (result instanceof Throwable) {
Throwable cause = (Throwable) result;
throw new ServerException(ResponseCode.FAIL, cause);
}
return (String) result;
}
//省略
}

此处在netty的ClientHandler中用BlockingQueue存放Decoded Response,这样调用HkpClientInitializer.getResponse()就可以阻塞直到服务端返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class HkpClientInitializer extends ChannelInitializer<SocketChannel> {
private HkpClientHandler clientHandler = new HkpClientHandler();

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new LoggingHandler());
pipeline.addLast("MessageDecoder", new MessagePackageDecoder(1024 * 1024, 4, 4, -8, 0));
pipeline.addLast("MessageEncoder", new MessagePackageEncoder());
pipeline.addLast("ReadTimeout", new ReadTimeoutHandler(5));
// 客户端的逻辑
pipeline.addLast("handler", clientHandler);
}
public String getResponse() throws InterruptedException {
return clientHandler.getResponse();
}
}

但是万万没想到,都已经设置了ReadTimeout,还是被阻塞了,对照了服务端的日志,发现服务端返回毫无问题。由于时间紧急,考虑到问题出现概率较小、单次状态查询失败几无影响,阻塞的地方改成快速失败足以解决问题:

1
2
3
4
Object result = response.poll(8, TimeUnit.SECONDS);
if (result == null){
throw new ServerException("Timeout recieving response from remote!");
}

根因分析

虽然暂时解决了问题,但是这个Netty客户端无疑还有瑕疵,偶尔收不到服务端的返回对于某些业务是无法接受的。
因此,还需要找到问题的根本原因。
从channelRead0入手,往上追溯,看看信息在哪里被遗漏掉了。

数据库事务

事务特性

  • 原子性
  • 一致性:事务执行前后 数据都出于一致性状态
    • 逻辑正确,转账、消费等业务
    • 相同数据在多个位置 值一致
  • 隔离性
  • 持久性

事务隔离级别==>决定锁的策略和粒度

  1. 读未提交(Read uncommitted)。

    写操作加写锁,读操作不加锁。可防止丢失更新。

2.读已提交(Read committed)。

写操作加写锁,读操作加读锁。防止脏读。这是大部分关系数据库的默认 隔离级别。

3.可重复读(Read repeatable)。

对于读操作加读锁到事务结束,其他事务的更新操作只能等到事务结束之后进行。不禁insert和delete,引起幻读

4.序列化(Serializable)。

读操作加表级读锁至事务结束。可以禁止幻读。

数据库锁

策略

  • 共享锁:允许多个持有,一般用来读
  • 排他锁
  • 更新锁:即将转化为排他锁,先读后更新,防止死锁
  • 意向锁 (在更粗的锁粒度上 标记锁占用)

粒度

  • 行rowlock
  • 页pagelock
  • 表tablelock

悲观锁&乐观锁

  • 悲观锁:利用数据库本身的锁机制实现。
    通过上面对数据库锁的了解,可以根据具体业务情况综合使用事务隔离级别与合理的手工指定锁的方式比如降低锁的粒度等减少并发等待。
  • 乐观锁:CAS策略。方式大概有以下3种
    • 对记录加版本号.
    • 对记录加时间戳.
    • 对将要更新的数据进行提前读取、事后对比。

Lock源码浅析

Lock

先看Lock接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Lock {
//获取锁
void lock();
//获取锁过程中监听中断状态
void lockInterruptibly() throws InterruptedException;
//尝试获取锁,立即返回true或false
boolean tryLock();
//一定时间内尝试获取锁,允许中断
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//释放锁,一般在finally中
void unlock();
//返回一个新建的Condition,用于实现wait/signal机制,释放锁并等待唤醒;应用:阻塞队列、CyclicBarrier、线程池等
Condition newCondition();
}

Lock比synchronized更灵活

  • 允许中断
  • 可以设置等待时限
  • 通过返回判断状态

还提供了Object wait/notify的增强版,方便线程协作。

ReentrantLock

再来看Lock的实现ReentrantLock,除了Lock规定的方法,它还提供了一些额外的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class ReentrantLock implements Lock, java.io.Serializable {

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

public int getHoldCount() {
return sync.getHoldCount();
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
public boolean isLocked() {
return sync.isLocked();
}
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public final boolean hasQueuedThread(Thread thread) {
return sync.isQueued(thread);
}
public final int getQueueLength() {
return sync.getQueueLength();
}
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}
public int getWaitQueueLength(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
}
protected Collection<Thread> getWaitingThreads(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
}
}

以上方法主要提供3个方面的功能:

  • 公平锁实现
  • 锁的状态(包括持锁线程、重入次数、排队线程等)
  • Condtion相关状态

ReentrantReadWriteLock

读写锁
ReentrantReadWriteLock包含readLock和writeLock,共享一个Syn子,其中readLock为共享锁,writeLock为排他锁,实现读写锁分离。

synchronized

修饰代码段或方法,非公平互斥锁,需指定对象。

volatile

  • 防止指令重排
  • 变量使用前从主存刷新

Condition应用

- BlockingQueue
    - notEmpty.signal()
    - notFull.signal()
- CyclicBarrier
    - trip.signalAll()
- ScheduledThreadPoolExecutor