百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT技术 > 正文

仓颉编程实练-并发-使用ArrayBlockingQueue实现工作者模式

wptr33 2025-09-19 03:55 3 浏览

main.cj:

package demo1

import std.collection.ArrayList
import std.collection.concurrent.*;
import std.sync.*

struct Task {
    public Task(let id: UInt64) {}
}

main(): Int64 {
    let taskQueue = ArrayBlockingQueue<Task>(10)
    let queueIsEnd = AtomicBool(false)

    let tList = ArrayList<Future<Unit>>()
    for (_ in 0..3) {
        let t = spawn {
            while(true) {
                if (queueIsEnd.load()) {
                    println("queue is end, thread end.")
                    break
                }

                let op = taskQueue.remove(2 * Duration.second) // 读取任务,并设置2s超时
                match (op) {
                    case Some(task) => 
                        let taskId = task.id
                        if (taskId == 0) { // 获得结尾空任务
                            queueIsEnd.store(true)
                            println("got empty task: ${taskId}, thread end.")
                            break
                        }
                        println("Worker 执行任务: ${taskId}")
                    case None => 
                        println("超时,继续等待")
                }
            }
        }
        tList.add(t)
    }

    for (i in 1..10) {
        taskQueue.add(Task(UInt64(i)))
    }
    taskQueue.add(Task(0)) // 放入结尾空任务

    for (t in tList) {
        t.get()
    }

    println("main thread done.")
    return 0
}

运行:

> cjpm run
Worker 执行任务: 1
Worker 执行任务: 3
Worker 执行任务: 5Worker 执行任务: 4
Worker 执行任务: 6

Worker 执行任务: 2
Worker 执行任务: 9
got empty task: 0, thread end.      
Worker 执行任务: 8
queue is end, thread end.
Worker 执行任务: 7
queue is end, thread end.
main thread done.

cjpm run finished

说明:

  1. 实现了并发的工作者模式,主线程向任务队列中添加任务,并发worker线程消费这些任务,直到所有任务被处理完毕,所有的worker线程收到通知后退出。
  2. 使用ArrayBlockingQueue进行线程间通信,worker读取时使用超时机制
  3. 增加一个空任务,并使用AtomicBool(原子操作)作为线程间通知任务队列已经为空的标志

可以对比一下golang的类似实现:

package main

import (
	"fmt"
	"sync"
)

// Task 表示要执行的任务
type Task struct {
	ID int
}

func main() {
	taskQueue := make(chan Task, 10)

	var wg sync.WaitGroup

	// 并发的worker
	for i := 1; i <= 3; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()

			for task := range taskQueue {
				fmt.Printf("Worker 执行任务 %d\n", task.ID)
			}
		}()
	}

	// 添加一些任务到任务队列中
	for i := 1; i <= 10; i++ {
		task := Task{ID: i}
		taskQueue <- task
	}

	// 关闭任务队列,等待所有工作者完成任务
	close(taskQueue)
	wg.Wait()
	println("main thread done.")
}

相关推荐

高性能并发队列Disruptor使用详解

基本概念Disruptor是一个高性能的异步处理框架,是一个轻量的Java消息服务JMS,能够在无锁的情况下实现队列的并发操作Disruptor使用环形数组实现了类似队列的功能,并且是一个有界队列....

Disruptor一个高性能队列_java高性能队列

Disruptor一个高性能队列前言说到队列比较熟悉的可能是ArrayBlockingQueue、LinkedBlockingQueue这两个有界队列,大多应用在线程池中使用能保证线程安全,但其安全性...

谈谈防御性编程_防御性策略

防御性编程对于程序员来说是一种良好的代码习惯,是为了保护自己的程序在不可未知的异常下,避免带来更大的破坏性崩溃,使得程序在错误发生时,依然能够云淡风轻的处理,但很多程序员入行很多年,写出的代码依然都是...

有人敲门,开水开了,电话响了,孩子哭了,你先顾谁?

前言哎呀,这种情况你肯定遇到过吧!正在家里忙活着,突然——咚咚咚有人敲门,咕噜咕噜开水开了,铃铃铃电话响了,哇哇哇孩子又哭了...我去,四件事一起来,人都懵了!你说先搞哪个?其实这跟我们写Java多线...

面试官:线程池如何按照core、max、queue的执行顺序去执行?

前言这是一个真实的面试题。前几天一个朋友在群里分享了他刚刚面试候选者时问的问题:"线程池如何按照core、max、queue的执行循序去执行?"。我们都知道线程池中代码执行顺序是:co...

深入剖析 Java 中线程池的多种实现方式

在当今高度并发的互联网软件开发领域,高效地管理和利用线程资源是提升程序性能的关键。Java作为一种广泛应用于后端开发的编程语言,为我们提供了丰富的线程池实现方式。今天,就让我们深入探讨Java中...

并发编程之《彻底搞懂Java线程》_java多线程并发解决方案详解

目录引言一、核心概念:线程是什么?...

Redis怎么实现延时消息_redis实现延时任务

一句话总结Redis可通过有序集合(ZSET)实现延时消息:将消息作为value,到期时间戳作为score存入ZSET。消费者轮询用ZRANGEBYSCORE获取到期消息,配合Lua脚本保证原子性获取...

CompletableFuture真的用对了吗?盘点它最容易被误用的5个场景

在Java并发编程中,CompletableFuture是处理异步任务的利器,但不少开发者在使用时踩过这些坑——线上服务突然雪崩、异常悄无声息消失、接口响应时间翻倍……本文结合真实案例,拆解5个最容易...

接口性能优化技巧,有点硬_接口性能瓶颈

背景我负责的系统到2021年初完成了功能上的建设,开始进入到推广阶段。随着推广的逐步深入,收到了很多好评的同时也收到了很多对性能的吐槽。刚刚收到吐槽的时候,我们的心情是这样的:...

禁止使用这5个Java类,每一个背后都有一段&quot;血泪史&quot;

某电商平台的支付系统突然报警:大量订单状态异常。排查日志发现,同一笔订单被重复支付了三次。事后复盘显示,罪魁祸首竟是一行看似无害的SimpleDateFormat代码。在Java开发中,这类因使用不安...

无锁队列Disruptor原理解析_无锁队列实现原理

队列比较队列...

Java并发队列与容器_java 并发队列

【前言:无论是大数据从业人员还是Java从业人员,掌握Java高并发和多线程是必备技能之一。本文主要阐述Java并发包下的阻塞队列和并发容器,其实研读过大数据相关技术如Spark、Storm等源码的,...

线程池工具及拒绝策略的使用_线程池处理策略

线程池的拒绝策略若线程池中的核心线程数被用完且阻塞队列已排满,则此时线程池的资源已耗尽,线程池将没有足够的线程资源执行新的任务。为了保证操作系统的安全,线程池将通过拒绝策略处理新添加的线程任务。...

【面试题精讲】ArrayBlockingQueue 和 LinkedBlockingQueue 区别?

有的时候博客内容会有变动,首发博客是最新的,其他博客地址可能会未同步,认准...