实际应用

文章目录
  1. 1. list实现分布式队列
  2. 2. zset实现延时队列
list实现分布式队列
// 需要的依赖jedis fastjson
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.4</version>
</dependency>
// 测试类
/**
* @Author long
* @Date 2019/9/8 15:00
*/
public class App {
private static final String url = "192.168.25.150";
private static final int port = 6379;

public static void main(String[] args) {
// 构建queue
String queueKey = "message_queue";
Jedis jedis = new Jedis(url, port);
// 消息转换器
MessageConvert<TaskItem> convert = new MessageConvert<>();
// 消息队列
RedisQueue queue = new RedisQueue(jedis, queueKey, convert);
// 生产者
MessageProducer producer = new MessageProducer(queue);
// 消费者
MessageConsumer consumer = new MessageConsumer(queue);
producer.start();
consumer.start();
}
}

// 消息队列
/**
* @Author long
* @Date 2019/9/8 14:34
* 分布式消息队列-list实现
*/
public class RedisQueue {

private Jedis jedis;
private String queueKey;

public RedisQueue(Jedis jedis, String queueKey, MessageConvert messageConvert) {
this.jedis = jedis;
this.queueKey = queueKey;
}

// 发送消息
public void push(String message) {
this.jedis.lpush(queueKey, message);
}

// 消费消息->可重试
public String poll(boolean isRetry) {
String message = jedis.rpop(queueKey);
if (message == null && isRetry) {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
message = jedis.rpop(queueKey);
}
return message;
}
}

/**
* @Author long
* @Date 2019/9/8 15:47
* 消息生产者
*/
public class MessageProducer extends Thread {
// 队列
private RedisQueue queue;
// 消息转换器
private static final MessageConvert<TaskItem> convert = new MessageConvert<>();

public MessageProducer(RedisQueue queue) {
this.queue = queue;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
send("message:" + i);
try {
TimeUnit.MILLISECONDS.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

// 创建并发送消息
private void send(String message) {
// 创建一条消息
TaskItem item = new TaskItem();
item.id = UUID.randomUUID().toString();
item.msg = message;
// 转换为字符串发送
String sendMess = convert.messageToString(item);
queue.push(sendMess);
}
}

/**
* @Author long
* @Date 2019/9/8 15:48
* 消息消费者
*/
public class MessageConsumer extends Thread {
private RedisQueue queue;
public MessageConsumer(RedisQueue queue) {
this.queue = queue;
}

@Override
public void run() {
Random random = new Random();
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(random.nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}
receive();
}
}

private void receive() {
Object object = queue.poll(true);
if (object != null) {
System.out.println(object);
}
}
}

/**
* @Author long
* @Date 2019/9/8 15:45
* 消息转换器
*/
public class MessageConvert<T> {
private Type taskType = new TypeReference<T>() {}.getType();

/**
* 将给定的消息转换为string类型
* @param t
* @return
*/
public String messageToString(T t) {
return JSON.toJSONString(t);
}

/**
* 将string类型的消息转换为T类型
* @param message
* @return
*/
public T stringToObject(String message) {
return JSON.parseObject(message, taskType);
}
}
zset实现延时队列
// 测试方法
private static void delayQueue() {
Jedis jedis = new Jedis(url, port);
// 延时队列
RedisDelayingQueue queue = new RedisDelayingQueue(jedis, "q-demo");
DelayMessageConsumer consumer = new DelayMessageConsumer(queue);
DelayMessageProducer producer = new DelayMessageProducer(queue);
producer.start();
consumer.start();
try {
// 首先让producer线程执行完成
producer.join();
// 主线程睡眠6秒,等待consumer将消息消费完成
Thread.sleep(6000);
consumer.interrupt();
consumer.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

/**
* @Author long
* @Date 2019/9/7 16:56
* 延时队列-> 通过zset实现
*/
public class RedisDelayingQueue {

private Jedis jedis;
private String queueKey;

public RedisDelayingQueue(Jedis jedis, String queueKey) {
this.jedis = jedis;
this.queueKey = queueKey;
}
// 执行消息的发送
public void delay(String msg, long score) {
jedis.zadd(queueKey, score, msg);
}

// 执行消息的处理
public void loop(MessageHandle handle) {
while (!Thread.interrupted()) {
// fixme 此处需要优化,保证操作的原子性
// 获取一条数据(score最小的那条数据)
Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
// 若是队列中没有任务,线程睡眠500毫秒
if (values.isEmpty()) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
break;
}
continue;
}
String value = (String) values.iterator().next();
// 从queueKey中移除该元素
if (jedis.zrem(queueKey, value) > 0) {
// 将消息交给回调接口来处理
handle.handle(value);
}
}
}
}

/**
* @Author long
* @Date 2019/9/8 17:13
* 延时队列-生产者线程
*/
public class DelayMessageProducer extends Thread {

RedisDelayingQueue queue;

public DelayMessageProducer(RedisDelayingQueue queue) {
this.queue = queue;
}

@Override
public void run() {
Random random = new Random();
for (int i = 0; i < 10 ; i++) {
String message = "message:"+ i;
long score = random.nextInt(10);
send(message, score);
}
}


private void send(String msg, long delay) {
TaskItem task = new TaskItem();
task.id = UUID.randomUUID().toString();
task.msg = msg;
String message = JSON.toJSONString(task);
queue.delay(message, delay);
}
}

/**
* @Author long
* @Date 2019/9/8 17:11
* 延时队列消费者线程
*/
public class DelayMessageConsumer extends Thread {

RedisDelayingQueue queue;

private final static MessageHandle handle = new MessageHandle() {
@Override
public void handle(String item) {
System.out.println(item);
}
};

public DelayMessageConsumer(RedisDelayingQueue queue) {
this.queue = queue;
}

@Override
public void run() {
this.queue.loop(handle);
}
}

/**
* @Author long
* @Date 2019/9/8 17:21
* 消息处理回调接口
*/
public interface MessageHandle {

void handle(String item);
}