分库分表


数据库优化之分库分表


针对数据库的优化有两点,第一是从整体层面优化,设计到读写分离和分库分表。第二是从sql层面优化,主要是涉及到索引相关的一些东西。


读写分离

需求: 读写分离的目的是做数据冗余备份,同时将读写分离,提升数据库的服务性能(IO性能瓶颈)。

搭建过程: 在基本操作里面->mysql配置主从同步的步骤 。

实现原理: 首先是master数据库会先将数据库的变化存储在binlog文件中。在slave数据库上,会有一个IO Thread负责将binlog文件读取到内部的relaylog文件中。同时,slave数据库上的另外一个线程SQL Thread读取relaylog,将数据写入到slave数据库里面。

细节: mysql的binlog文件存储在 /var/lib/mysql文件夹下;使用命令-> mysqlbinlog –base64-output=decode-rows -v mysql-bin.000001 查看binlog的内容;binlog的格式:statement(默认,基于sql语句模式,针对数据更新的一些函数now()等,数据延迟造成的不一致),row:基于行模式,记录修改后每一条数据变化的值,mixed:混合模式,由mysql自动判断处理。

问题:同步延迟,master tps较高时出现同步延迟; 网络延迟;磁盘IO


分库分表

垂直拆分

概念:垂直拆分主要是通过业务层面,将各个业务模块所使用到的表放到各自不同的数据库里面,做到各个业务模块库相互隔离的目的。

问题:若是各个业务模块的表之间存在一些关联查询,需要将这些查询改为服务调用的方式。 针对一些全局表,可以改为服务调用的方式,对外提供服务。

水平拆分

概念:将一张大的表拆分成n多张小表。实现方式,第一种是通过一致性hash(若是新增表的话,会涉及到数据迁移的问题)。第二种是可以按照id的范围来拆分。第三种是通过日期来拆分。

问题:唯一主键问题,可以使用zk自增id 可以使用redis的自增id 可以使用代理的id


Mycat

1. 从github上面clone项目 https://github.com/MyCATApache/Mycat-Server.git
2. 修改项目中schema.xml配置文件中的dataHost数据库配置节点信息,修改为可用的数据库
3. MycatStartup类运行main函数启动
idea启动前需要在配置参数VM options: -DMYCAT_HOME=D:\code\Mycat-Server\src\main
4. 通过数据库连接工具可以连接到MyCat,数据库相关配置在server.xml配置文件里面
默认的用户名: root 密码: 123456 port: 8066
5. mycat里面有三个比较重要的配置文件
server.xml 配置数据库连接相关的一些信息
schema.xml 配置数据库的一些节点信息
rule.xml 配置一些路由规则
6. 支持单库分表 支持跨库分表 支持配置读写分离(writeHost和readHost节点)

基本操作


centos7上mysql的安装步骤

1.下载mysql的repo源  > wget http://repo.mysql.com/mysql57-community-release-el7-8.noarch.rpm
2.安装源> rpm -ivh mysql57-community-release-el7-8.noarch.rpm
3.安装数据库->yum install mysql-server
4.启动数据库> systemctl start mysqld
5.查看mysql为root账号生成的随机密码>grep "password" /var/log/mysqld.log
说明root@localhost:此处为随机密码
6.运行mysql -uroot -p回车
7.粘贴随机密码->此时已经登录到mysql数据库,需要为root账号设置密码
8.由于mysql5.7有对密码设置的验证,简单密码设置不了,需要运行下面两条命令关闭验证
> set global validate_password_length=1;
> set global validate_password_policy=0;
9.修改root账号的密码
> set password = password('123456');
10.修改root账号的权限
> GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'root' WITH GRANT OPTION;
11.可以创建一个其它的用户
> create user repl identified by 'repl';
12.为该用户授权
> grant replication slave on *.* to 'repl'@'%' identified by 'repl';(数据同步的权限)
> GRANT ALL ON *.* TO 'pig'@'%';(为用户pig授予所有权限)

mysql配置主从同步的步骤

1.在master服务器上创建一个可以进行数据同步的账户。
> create user repl identified by 'repl';
2.在master服务器上为该用户授权。
> grant replication slave on *.* to 'repl'@'%' identified by 'repl';
3.在master服务器上修改/etc/my.cnf文件。
[mysqld]
log-bin=mysql-bin
server-id=147
4.在master上面重启mysql
> systemctl restart mysqld
5.在master上登录数据库,使用下面命令查看二进制文件。
> show master status;
6.在slave服务器上修改/etc/my.cnf文件
[mysqld]
server-id=149
relay-log=slave-relay-bin
relay-log-index=slave-relay-bin.index
read-only=1
7.在slave服务器上重启mysql
> systemctl restart mysqld
8.在slave上登录数据库, 执行下列命令,设置master相关参数。
> change master to master_host='192.168.25.147',master_port=3306,master_user='repl',master_password='repl',master_log_file='mysql-bin.000003', master_log_pos=154;
9.在slave上运行命令,启动服务
> start slave;
10.查看状态,查看主从同步设置是否成功
> show slave status\G;
11.搭建完成,现在就可以玩啦,尝试在master上面创建数据库和表,在slave上会看到相应的数据库和表。

索引

1.数据库四大特性

​ A( Atomicity 原子性): 数据库最小的工作单元,整个工作单元要么一起提交成功,要么一起失败回滚。
​ C( Consistency 一致性): 事物中操作的数据的状态是一致的。即写入数据的结果必须完全符合预设的规则,不会因为出现系统意外等原因导致状态的不一致。
​ I( Isolation 隔离性): 一个事务所操作的数据在提交之前,对其他事务的可见性设定(一般设定为不可见)。
​ D( Durability 持久性): 数据库的数据一旦提交,无法更改。

2.多个事物并发引起的数据读取问题

​ 脏读: 是指一个事物读取到了另外一个事物未提交的数据。
​ 不可重复读: 是指在一个事物未结束之前, 前后两次读取到的数据不一致现象。原因在于该事物在前后两次读取数据之间,另外一个事物修改了该数据。(不可重复读的重点在于修改)。
​ 幻读: 是指当一个事物修改了数据库表中某一个范围内的数据的某一个字段,但是另外一个事物在此期间又在该范围内插入了一条新的数据,造成前一个事物出现幻觉(没有完全修改)。(幻读的重点在于新增或删除) 。

3.数据库事物的隔离级别

Read Uncommited 读未提交: 事物未提交对其它的事物也是可见的。
Read Commited 读已提交: 一个事物只能够读取到已提交的数据。(解决脏读, 未解决不可重复读)。
Repeatable Read 可重复读: 一个事物对数据的前后读取结果是一致的。(解决了不可重复读, 未解决幻读)。
Serializable 串行化: 数据库最高的隔离级别, 强制所有事物串行执行,解决了所有并发问题。

4.聚集索引和非聚集索引的区别
聚集索引: 表中的数据是按照索引的顺序来存储的。索引的叶子节点上存储了真实的数据,不会有另外单独的数据页。
非聚集索引: 表中的数据存储不依赖于索引的顺序。索引的叶子节点上存储了索引的关键字和指向真实数据的指针。

5.sql调优

​ a. 创建索引 b.使用临时表存储中间结果->(避免多次扫描主表)。c. 避免在索引上使用计算。4.少使用select *,只返回需要的字段。

6.如何理解MVCC

​ a. 数据库每张表会单独维护两个字段,数据行版本号和删除版本号。

​ b. 当执行insert操作时,我们开启了一个事物,执行数据插入操作时,会将这个事物的事物id设置到数据行版本号这个字段中(这个事物的事物id属于数据库一个全局属性,自增)。

​ c. 当执行delete操作时,我们开启了一个事物,执行数据删除操作时,会将这个事物的事物id设置到删除版本号这个字段中。

​ d. 当执行update操作时,我们开启了一个事物,指定数据更新操作时,会将这行数据copy一份,copy的这份数据数据行版本号为当前事物id,删除版本号为Null,并更新相关字段。原先那行的删除版本号会被设置为当前的事物id。

​ e. 当我们执行数据库查询时,满足以下两点要求:

​ e1:查找数据的数据行版本号小于或等于当前事物id。(保证该条数据在当前事物开启之前就已经存在或者为该事物添加的数据。

​ e2:查找数据的删除版本号为Null或者删除版本号大于当前事物id。(该条规则可以确保当前事物在开始之前数据还未被删除。

7.MVCC解决的问题与未解决的问题

若是一个查询先于一行的数据更新,不会出现问题。若是一个查询后于一行的数据更新,会产生脏读的问题。

8.Innodb数据库的四种隔离级别是如何实现的

​ 读未提交:对select操作不会加锁,并发性能是最好的,但是容易造成脏读。

​ 读已提交(互联网上默认的隔离级别):普通的数据读取是会直接读取数据快照,加锁的select,update等操作会使用记录锁。注意:读已提交读取快照时,一个事物读取了数据,但是当第二次读取的时候,另外一个事物已经将该快照刷新了,所以会造成不可重复读的问题。

​ 可重复读(Innodb默认的事物隔离级别):对于普通的数据库查询,使用读取快照的方式。对于加锁的select,update等语句,他们加锁的力度取决于查询条件是使用了唯一索引还是使用了范围查询。若是使用了唯一索引,会使用记录锁的方式。若是使用了范围查询,会使用间隙锁,避免发生不可重复读。注意:当一个事物开启读取数据时,前后两次读取的都是同一个快照,这样就可以实现了可重复读。

​ 串行化:针对所有的操作都会去加锁,普通的select操作会去加共享锁->select * from table in share mode。对于 update 等操作会加排他锁。若是一个事物查询操作时,正好有一个事物对改行的数据做修改操作。则该查询操作会阻塞,直到更新操作执行完成。

9.如何理解快照读和当前读

​ 快照读:读取的数据是快照。当前读:读取的数据是数据库的最新的数据。

tcp


此时握手

四次挥手

为什么要三次握手

为什么要四次挥手

socket的几种状态

listen:侦听来自远方tcp端口的连接请求

syn-sent:在发送连接请求后,等待匹配的连接请求。

syn-receive:在收到和发送一个连接请求后,等待连接请求的确认

established:代表一个打开的连接,数据可以传送给用户

project-ask

项目专题

系统1

系统数据流转

任务执行流程

// 大体流程
1. 从原始借据表bp_collect_loan_src同步数据到bp_collect_loan,同步批次号为batch_no的数据。(同步数据时,每个线程处理数量默认为10000,若是当前批次数据量超过10000,会采用多线程来执行)
2. 针对每一个原始借据,数据库里面只有可能有一条与之对应的借据,所以将bp_collect_loan存入数据库的操作采用的是insertOrUpdate操作(任务执行失败后,不会对数据库中的数据造成影响)。
3. 借据同步完成之后,需要执行归案任务,就是将多个借据加工成为一个案件,以人为维度进行催收。
4. 归案完成之后,需要跑规则引擎来执行分案,通过规则引擎分案之后,会在数据库表bp_collect_divide表里面生成分案记录。
5. 将这些分案记录针对案件进行分案(分案记录里面存储了两类,针对个人的分案,针对组的分案)

// 使用到的技术点(分布式锁 redis队列)
1. 由于任务是部署在多台机器上的,多个进程执行任务时,需要获取分布式锁。项目中采用redis来实现了分布式锁,上锁命令采用set命令,外加是否存在,过期时间参数来实现。而释放锁的命令采用lua脚本来实现,因为会有两步骤操作,首先式判断上锁客户端和释放锁的客户端是否一致(存储在value中),其次是删除该锁对应的key,lua脚本保证了该操作的原子性。
2. 在执行归案的过程中,针对每一个案件,需要将该案件分出去,会过规则引擎。而将分案请求交给规则引擎有两种方式,第一种是先将该案件构建成为请求,然后将该请求放到redis的同步队列里面,规则引擎那边从同步队列里面来进行消费。第二种是若同步队列已满,或者放入到同步队列出错,则会将请求直接交给规则引擎来处理。

// 技术难点-如何克服

threadpool


线程池

// 创建线程池的工具类Executors
public class Executors {
// 创建一个只有一个工作线程的线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

// 创建一个固定数目线程的线程池(核心线程数和最大线程数相同)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

// 创建一个cache线程池,核心线程大小为0
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}


// 线程工厂,负责为线程池中的Worker创建线程-》可以创建个性化的线程
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable r) {
// 返回的线程又将Worker做了一层封装,当该线程执行strart方法时,将会调用Worker的run方法
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
}


// jdk定义的线程池
public class ThreadPoolExecutor extends AbstractExecutorService {
// 核心线程数
private volatile int corePoolSize;
// 最大线程数
private volatile int maximumPoolSize;
// 阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 非工作线程存活时间
private volatile long keepAliveTime;
// 创建线程的工厂
private volatile ThreadFactory threadFactory;
// 线程池的拒绝策略
private volatile RejectedExecutionHandler handler;
// 存储创建好的Worker线程
private final HashSet<Worker> workers = new HashSet<Worker>();
// 主锁,对workers进行操作时会上锁
private final ReentrantLock mainLock = new ReentrantLock();

// 定义线程池内部的工作线程
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
// 当前Worker持有的线程
final Thread thread;
// 当前Worker执行的第一个任务
Runnable firstTask;
// 记录当前Worker已经完成的任务数量
volatile long completedTasks;

Worker(Runnable firstTask) {
// 将aqs中的state值由0设置为-1,禁止中断(上锁了)
setState(-1);
this.firstTask = firstTask;
// 调用工厂为当前的Worker创建一个Thread—>传入的是当前this对象
this.thread = getThreadFactory().newThread(this);
}

// 当前Worker线程启动需要执行的方法,该方法会由内部属性thread调用start方法时触发。
public void run() {
runWorker(this);
}

// 判断线程是否被独占
protected boolean isHeldExclusively() {
return getState() != 0;
}

// 尝试获取锁
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

// Worker线程执行的真正逻辑
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();//执行任务之前允许被打断
boolean completedAbruptly = true;
try {
// 当前worker的task不为null或者是阻塞队列不为null,worker线程会一直运行
while (task != null || (task = getTask()) != null) {
// 上锁
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 调用worker的run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}







// 初始化一个线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

// 1.若是当前工作线程数小于核心线程数, 会去创建一个核心线程。
// 2.若是当前工作线程数大于等于核心线程数,将任务放入到阻塞队列。
// 3.若是阻塞队列已满,会去创建非核心线程。
// 4.若是创建非核心线程也失败,执行拒绝策略。
public void execute(Runnable command) {
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

// 1.尝试着将工作线程数加1,workers的个数。
// 2.创建一个Worker线程,并将其加入到workers数组中。
// 3.若是加入到workers数组成功,调用worker的thread的start方法,启动线程。
// 4.线程启动之后,会调用worker的run方法,而run方法又是调用runWorker(this)方法来执行的。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 自旋
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
// 将创建好的Worker加入到workers数组中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}









}

// spring线程池->ThreadPoolTaskExecutor
public class ThreadPoolTaskExecutor {
private final Object poolSizeMonitor = new Object();
// 核心线程数
private int corePoolSize = 1;
// 最大线程数
private int maxPoolSize = 2147483647;
// 非核心线程空闲最大存活时间
private int keepAliveSeconds = 60;
// 缓存队列容量
private int queueCapacity = 2147483647;
// 是否允许核心线程池超时等待(设置为false,当到达一定时间没有任务,线程池会自动关闭)
private boolean allowCoreThreadTimeOut = false;

private TaskDecorator taskDecorator;
private ThreadPoolExecutor threadPoolExecutor;

public ThreadPoolTaskExecutor() {

}

// 因为spring线程池提供了更加灵活的配置,项目中一般使用的是spring的线程池。

lock


前戏java里面提供了两种锁机制,基于jvm层面实现的关键字synchronized和基于jdk层面实现的Lock锁。


synchronized关键字

基本使用:synchronized关键字可以用来修饰静态方法,普通方法,代码块,获取到的锁对象分别为当前类Class对象,当前实例对象和synchronized括号里面的对象。

加锁原理:synchronized加锁是基于对象监视器Monitor的monitorenter和monitorexit。在java中,每个对象都会有一个对象监视器,当synchronized修饰代码块时,在开始位置会加上monitorenter指令,在方法结束和异常处会插入monitorexit指令。当执行monitorentrt指令时,会去获取锁的Monitor对象,若是获取到,执行,获取不到的话,线程阻塞。

synchronized的锁升级

偏向锁:当线程执行时,会去修改对象的对象头(Mark Word)中线程id,若是修改成功,执行。若是修改不成功,需要将偏向锁升级为轻量级锁。锁升级的过程是,该获取锁的线程通知Mark Word中标识的线程,使其进入暂停状态。

轻量级锁:争抢锁的线程会去将对象的对象头(Mark Word)拷贝的线程栈中,并将对象头指向该栈(cas操作),若是执行成功,获取到锁,执行代码。若是执行失败,自旋等待其它线程释放锁。

重量级锁:当自旋超过了一定的时间之后,若是还不能获取到锁,将会升级为重量级锁,线程阻塞。


Lock接口

特性 描述
尝试非阻塞的获取锁 当前线程尝试获取锁,若这一时刻锁没有被其它线程获取到,则成功获取并持有锁。
能被中断的获取锁 获取到锁的线程能够响应中断,当获取到锁的线程被中断时,中断异常将会被抛出,同时锁会被释放。
超时获取锁 在指定的时间之前获取锁,如果过了指定的时间任然无法获取到锁,则返回。
方法名称 描述
void lock() 线程获取锁,当获取到锁后,线程从该方法返回。
void lockInterruptibly() throws InteruptedExecption 与lock方法不同之处在于可以在获取锁的过程中中断当前的线程。
boolean tryLock() 尝试获取锁,获取到返回true,未获取到返回false。
boolean tryLock(long time, TimeUnit unit) throws InteruptedException; 尝试获取锁,下列三种情况下会返回:1.在指定的时间内获取到锁。2.过了超时时间为获取到锁。3.当前线程被中断。
void unlock() 释放锁
Condition newCondition() 获取等待通知组件,只有成功获取到了锁,才能创建该组件

lock和synchronized的区别

关于加锁和释放锁方式和原理的不同

  • synchronized是jvm层面提供的关键字,获取锁和释放锁不需要手工干预。synchronized获取锁时会获取锁定对象(静态方法->类对象,普通方法->当前对象,代码块->提供的对象)的对象监视器Monitor。一旦一个线程获取到这个对象的Monitor,其它线程就无法获取。但是同一个线程对这个Monitor可以多次获取(可重入)。对于锁的释放,当方法正常执行结束或者发生异常时,会释放该锁。

  • lock锁是jdk层面提供的锁,可以基于api执行上锁和解锁操作,操作更加的灵活。Lock接口实现的可重入锁ReentLock,其底层是依赖于AQS实现,AQS内部维护了一个同步队列,获取锁的线程会被加入到这个同步队列上面,等待前一个获取锁的节点释放锁时唤醒。ReentLock提供了多种获取锁的方式,可以在获取锁的时候立即响应中断。

    关于等待队列和同步队列的不同

  • synchronized基于对象监视器,调用底层的wait方法时,会将线程加入到等待队列中,内部只维护了一个等待队列,而调用notify或者notifyAll时,会将唤醒的线程加入到同步队列。

  • ReentLock内部可以创建多个等待队列,可以调用await方法将获取到锁的线程加入到等待队列,也可以调用singal,singalAll唤醒线程,将其加入到同步队列中。


http


  1. 对http协议的理解

    ​ http协议是一种基于客户端->服务器模式的协议,客户端发送请求,服务器返回响应。http协议通过uri定位访问的资源。http协议是一种无状态的协议,服务 器无法识别同一浏览器的前后两次请求(为解决无状态,浏览器端引入了cookie机制)。


  1. http响应状态码
状态码 类别 原因短语
1XX Informational(信息性状态码) 接收的请求正在处理
2XX Success(成功状态码) 请求正常处理完毕
3XX Redirection(重定向状态码) 需要进行附加操作以完成请求
4XX Client Error(客户端错误) 服务器无法处理请求
5XX Server Error (服务端错误) 服务器处理请求出错

  1. http请求首部常用字段

    Accept:用户代理可处理的媒体类型。

    Accept-Encoding:优先的内容编码。

    Accept-Language:优先的语言。

    Content-Type:实体类型。


  1. 跨域问题

io


1.在jdk1.4,java引入了nio,nio是一种非阻塞io。


  1. 在nio中有三个概念
  • 缓冲区(buffer):java nio中数据的读取和存放需要通过缓冲区。

  • 通道(channel):可以理解为io中流的概念 ,与流不同的是,一个通道中既可以进行数据的读取,也可以进行数据的写入,而在io模型中,数据的读取和写入会有专门的输入和输出流来进行操作。

  • 选择器(select):通道可以在选择器上注册相关的事件,而选择器会有一个专门的线程来负责轮询这些事件,当某个写入事件或是读取事件可写或可读时,会交给相应的线程来处理。


  1. 通过java nio模拟一个服务端-客户端通信的实例
/**
* NIO Server服务
*/
class NIOServer{
// Selector->注册channel
Selector selector = null;
// ServerSocketChannel->服务端channel,类似ServerSocket
ServerSocketChannel serverSocketChannel;
// 处理selector轮询事件
private ChannelHandle handle;
// NIO服务关闭标识
private volatile boolean stop = false;

NIOServer(int port) {
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
// 设置channel为非阻塞模式
serverSocketChannel.configureBlocking(false);
// 为channel绑定端口
serverSocketChannel.bind(new InetSocketAddress(port));
// 将channel注册到selector上,监听连接事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
handle = new ChannelHandle();
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 启动服务
* @throws IOException
*/
public void start() throws IOException {
while (!stop) {
// 获取到等待处理的IO事件数量
int readyChannels = selector.select();
// 若是等待处理的IO事件数量为0,不处理
if (readyChannels == 0) {
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
// 处理这些SelectionKey
while (iterator.hasNext()) {
// 获取到该key
SelectionKey key = iterator.next();
// 移除该key
iterator.remove();
// 分别处理各自事件
if (key.isAcceptable()) {
handle.handleAccept(key);
} else if (key.isReadable()) {
handle.handleRead(key);
} else if (key.isWritable()) {
handle.handleWrite(key);
}
}
}
}

// 停止该服务
public void stop() {
this.stop = true;
}
}


/**
* 针对selector上不同事件的处理类
*/
class ChannelHandle {
/**
* 处理连接事件
* @param key
* @throws IOException
*/
public void handleAccept(SelectionKey key) throws IOException {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(key.selector(), SelectionKey.OP_READ);
}

/**
* 处理可读事件
* @param key
* @throws IOException
*/
public void handleRead(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
// fixme 对读取到的数据进行处理->相关协议解析
sc.register(key.selector(), SelectionKey.OP_WRITE);
}

/**
* 处理可写事件
* @param key
* @throws IOException
*/
public void handleWrite(SelectionKey key) throws IOException {
System.out.println("处理写数据");
// fixme 对输出结果按照相关协议进行封装
String header = "HTTP/1.1 200 OK\r\n";
StringBuffer result = new StringBuffer(header);
result.append("Content-Type:application/json\n");
result.append("\r\n");
result.append("hello,world");
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer wrap = ByteBuffer.wrap(result.toString().getBytes());
channel.write(wrap);
channel.close();
}
}

// 调用类
public class NIOUtil {
public static void main(String[] args) {
// 启动一个基于NIO的服务
NIOServer nioServer = new NIOServer(8070);
try {
nioServer.start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
  1. 下面是通过java io的方式来实现的
public class IOUtil {

public static void main(String[] args) {
IOServer.createServer();
}
}


class IOServer {

private static volatile boolean stop = false;
/**
* 开启服务
*/
static void createServer() {
ExecutorService executorService =
Executors.newFixedThreadPool(3);
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(8090);
System.out.println("服务器在端口8090上启动。。。");
while (!stop) {
try {
Socket socket = serverSocket.accept();
// 将任务提交给线程池来处理
executorService.execute(new SocketHandle(socket));
} catch (Exception e) {
e.printStackTrace();
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (serverSocket != null) {
serverSocket.close();
}
executorService.shutdown();
} catch (IOException e) {
e.printStackTrace();
}
}
}

/**
* 关闭服务
*/
static void stopServer() {
stop = true;
}
}


/**
* socket处理线程类
*/
class SocketHandle implements Runnable {
private Socket socket;

SocketHandle(Socket socket) {
super();
this.socket = socket;
}

@Override
public void run() {
// 处理输入
InputStream is = null;
InputStreamReader isr = null;
BufferedReader br = null;
try {
is = socket.getInputStream();
isr = new InputStreamReader(is);
br = new BufferedReader(isr);
String s = null;
while ((s = br.readLine()) != null && s.length() > 0) {
System.out.println(s);
s = null;
}
} catch (Exception e) {
e.printStackTrace();
} finally {

}
// 处理输出 \r\n 回车换行
OutputStream os = null;
try {
os = socket.getOutputStream();
os.write("HTTP/1.1 200 OK\r\n".getBytes());
os.write("Content-Type:application/json\n".getBytes());
os.write("\r\n".getBytes());
os.write("hello,world".getBytes());
os.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}