前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Halcon 消息队列

Halcon 消息队列

作者头像
为为为什么
发布2023-03-10 21:13:42
6340
发布2023-03-10 21:13:42
举报
文章被收录于专栏:又见苍岚又见苍岚

之前我们介绍了 消息队列,本文介绍 Halcon 消息队列的用法。

消息队列

我们可以把消息队列比作是一个存放消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。 具体内容详见 消息队列

Halcon 实现

创建消息队列

核心函数 create_message_queue

代码语言:javascript
复制
create_message_queue( : : : QueueHandle)

该函数创建一个新的空的消息队列,输出参数 QueueHandle 是指向新建消息队列的句柄。

消息队列被设计成 FIFO 管道,在不同的线程之间安全地传递任意集合的数据。

队列访问在内部是完全同步的,不需要从应用程序进行显式锁定。数据在所谓的消息中通过队列传输。

多个生产者线程可以同时添加数据(enqueue_message) ,而多个消费者线程可以同时提取数据(dequeue_message)。

所有排队的消息都由 enqueue_message 操作复制。因此,原始消息可以在 enqueue_message 调用之后立即重用,而不会影响到排队副本。

创建消息

核心函数 create_message:

代码语言:javascript
复制
create_message( : : : MessageHandle)

create_message 创建一个新的空消息。输出参数 MessageHandle 是新创建的消息的句柄,用于在使用该消息的任何后续运算符调用中标识该消息。消息充当类似字典的容器,可以使用异步消息队列在应用程序的线程之间传递。

消息可以存储任意数量的条目,每个条目都有其唯一的键(字符串或整数)和相关的值。每个键可以引用控件参数元组,也可以引用图标对象。这些数据分别使用 set_message_tupleset_message_obj 存储到消息中,在这里可以使用 get_message_tupleget_message_obj 再次检索这些数据。

存储在消息中的控件参数元组始终是原始数据的深度副本。因此,可以在 set_message_tuple 调用之后立即重用原始数据,而不会影响消息。值得注意的例外是句柄: 在消息中存储任何句柄都会复制句柄值,但不会复制句柄后面的资源。

设置消息内容

核心函数 set_message_obj , set_message_tuple:

代码语言:javascript
复制
set_message_obj(ObjectData : : MessageHandle, Key : )
set_message_tuple( : : MessageHandle, Key, TupleData : )

两个函数分别可以将 objtuple 数据添加进消息句柄中,使用时类似于字典的取值用法;添加进消息的数据是原始数据的深拷贝。

添加消息进队列

核心函数 enqueue_message

代码语言:javascript
复制
enqueue_message( : : QueueHandle, MessageHandle, GenParamName, GenParamValue : )

enqueue_message 将一个或多个消息排队到由 QueueHandle 参数表示的消息队列。任何线程都可以使用 dequeue_message 从队列中检索排队的消息。

多个生产者(排队)线程和多个使用者(排队)线程可以同时共享相同的队列。消息按先进先出(FIFO)顺序传递。即使多个使用者线程正在使用队列,每条消息也只传递一次。

队列访问在内部是完全同步的,不需要外部锁定。如果队列为空,并且在 dequeue_message 中至少有一个使用者线程在等待消息数据,那么其中一个线程将被成功的 enqueue_message 调用唤醒,并立即传递加入队列的消息数据。否则,消息数据将异步附加到队列中,以便在使用者线程准备好再次取消消息数据队列时立即传递。

所有排队的消息(MessageHandle)都由 enqueue_message 操作复制。因此,原始消息可以在 enqueue_message 调用之后立即重用,而不会影响到排队副本。

操作符参数 GenParamNameGenParamValue 保留在操作符接口中以供将来使用,目前不支持通用参数。

如果在操作之后,队列中包含的消息数量大于最大队列长度 set_message_queue_param 的参数值max_message_num 所指定的最大数量,会抛出 H_ERR_MQOVL 错误。

从队列中取走消息

核心函数 dequeue_message

代码语言:javascript
复制
dequeue_message( : : QueueHandle, GenParamName, GenParamValue : MessageHandle)

Dequeue_messageQueueHandle 参数表示的消息队列中排出消息。消息必须由使用 enqueue_message 的任何线程排队。

消息按先进先出(FIFO)顺序传递,每条消息只传递一次。如果队列不是空的,dequeue_message 将立即从队列传递最早的消息。此消息将从队列中删除,并在 MessageHandle 输出参数中返回该消息的句柄。消息数据所有权从消息队列传输(不复制)到新创建的消息句柄。

如果队列为空,dequeue_message 将阻塞,直到消息可以传递(另一个线程使用 enqueue_message 添加消息)。

可以使用 get_message_tupleget_message_objget_message_param 查询存储在消息中的数据。

通过 dequeue_message 获得的消息句柄可以进一步重用(修改和/或排队到另一个消息队列)。

如果使用一个 enqueue_message 调用对多个消息进行排队,那么所有这些消息也将通过一个 dequeue_message 调用一起检索,并通过 MessageHandle 元组传递多个消息句柄。

队列访问在内部是完全同步的,不需要外部锁定。

在应用程序重新配置或清理期间,可能需要唤醒在 dequeue_message 中等待消息的线程。这可以通过使用带有参数 abort_dequeue 的运算符 set_message_queue_param 来实现。在这种情况下,当前阻塞的 dequeue_message 调用立即使用 H_ERR_MQCNCL 返回。

最后,可以通过 GenParamNameGenParamValue 中的通用参数来调整 dequeue_message 行为。目前,只支持一个通用参数 —— timeout

如果队列为空,则超时控制操作员在等待消息时将阻塞多长时间。过期时,操作符返回H_ERR_TIMEOUT。超时可以指定为整数或双精度值(以秒为单位) ,也可以指定为字符串infinite表示永远等消息。

代码语言:javascript
复制
dequeue_message (MessageQueue, 'timeout', 'infinite', MessageData)

如果未指定超时,则默认使用无限超时,这意味着操作符将阻塞,直到新消息进入队列或操作中止。

获取消息内容

核心函数 get_message_paramget_message_objget_message_tuple

代码语言:javascript
复制
get_message_param( : : MessageHandle, GenParamName, Key : GenParamValue)
get_message_obj( : ObjectData : MessageHandle, Key : )
get_message_tuple( : : MessageHandle, Key : TupleData)

get_message_objget_message_tuple 算子从指定 MessageHandle 的消息中提取 key 值对应的内容,得到 TupleData 数据。提取的数据必须是通过 set_message 系列算子设置的。

get_message_param 算子查询消息参数的当前值或有关消息状态的其他信息。

可以查询的值包括:

key

含义

message_keys

查询存储在消息中的所有键,不管它们是与元组数据还是对象数据相关联。键列表通过 GenParamValue 以字符串元组的形式报告。对于此查询,参数 Key 必须是空元组。

key_exists

如果给定的密钥存储在消息中,则报告1,否则报告0。结果通过 GenParamValue 报告,每个键一个值。

key_data_type

报告与消息中的 tuple 数据关联的键的 tuple(可以使用 get_message_tuple 检索数据)。为与对象数据关联的键报告“object(可以使用 get_message_obj 检索数据)。结果通过 GenParamValue 报告,每个键一个值。这个参数有助于动态决定是使用 get_message_tuple 还是 get_message_obj 来获取特定键的数据。

获取队列参数

核心函数 get_message_queue_param :

代码语言:javascript
复制
get_message_queue_param( : : QueueHandle, GenParamName : GenParamValue)

get_message_queue_param 查询消息队列参数的当前值或有关队列状态的其他信息。可以通过一个 get_message_queue_param 调用执行多个查询,将多个参数名传递给参数 GenParamName。在 GenParamValue,参数值的返回顺序与调用者请求的参数名相同。

当前支持查询的属性:

key

含义

is_empty

如果队列为空,则返回1,否则返回0。

message_num

返回队列中当前存储的消息数。

max_message_num

返回消息队列的“ max_message_num”参数的当前值,因为它是使用 set_message_queue_param 设置的。默认值 -1表示没有限制。

Halcon 生产者消费者示例

Halcon 自带例程 examples/hdevelop/System/Multithreading/message_queue_producer_consumer.hdev

我模仿实现了简化版的内容:

  • 主程序
代码语言:javascript
复制
set_system ('parallelize_operators', 'false')  * 关闭 AOP
   count_seconds (T1)
   thread_num := 12
   create_message_queue (image_path_queue)		* 数据队列,包含结束处理线程
   create_message_queue (finish_queue)			* 结果队列,检查工作是否完成
   
   for ThreadIndex := 0 to thread_num - 1 by 1
       par_start<VProcThreads.at(ThreadIndex)> : processing_thread (DarkImage, SubImage, golden_img, image_height, image_width, Mean, nccModel, image_path_queue, finish_queue)	* 工消费者线程,创建后阻塞从数据队列中提取数据
   endfor
   
   par_start<AcqThread> : acquisition_thread (ImageFiles, data_size, image_path_queue, finish_queue, thread_num) 	* 生产者线程,向队列中不断添加数据
   
   NumStartedThreads := thread_num + 1
   NumFinishedThreads := 0
   
   while (1)
       * Read next message with results from the queue.
       dequeue_message (finish_queue, 'timeout', 'infinite', MessageResult) 	* 查看消费者完成工作的情况
       * 
       * Here, the message can be either a real collection of results processing
       * a single input image - or a special message informing about completion
       * of one of the worker threads.
       get_message_param (MessageResult, 'key_exists', 'thread_finished', ThreadEndInfo)
       if (ThreadEndInfo[0])
           * Worker-thread-finish message, just increment the
           * finished-thread counter.
           get_message_tuple (MessageResult, 'thread_finished', ThreadEndMessage)
           NumFinishedThreads := NumFinishedThreads + 1
       endif
       if (NumFinishedThreads == NumStartedThreads) * 所有消费者都完成了工作
           break
       endif
   endwhile
   count_seconds (T2)
   producer_consumer_without_aop_time := (T2-T1) * 1000
  • 生产者代码
代码语言:javascript
复制
for Index := 0 to data_size - 1 by 1

    image_path := image_path_list[Index]
    * 
    * Create a message and store the acquired image path in it
    create_message (MessageImgPath)

    set_message_tuple (MessageImgPath, 'path', image_path)
    * 
    * Send the message to the "image" queue, which is used to deliver
    * input images from the acquisition thread to the multiple
    * processing threads.
    enqueue_message (image_path_queue, MessageImgPath, [], [])
    * 
    * Pretend non-trivial acquisition time
    wait_seconds (0.01)
endfor

create_message (MessageStop)
set_message_tuple (MessageStop, 'stop_processing', 1)
for Index := 1 to worker_num by 1
    enqueue_message (image_path_queue, MessageStop, [], [])
endfor

create_message (MessageFinished)
set_message_tuple (MessageFinished, 'thread_finished', 'Acquisition thread finished')
enqueue_message (finish_queue, MessageFinished, [], [])

return ()
  • 消费者代码
代码语言:javascript
复制
while (1)
    dequeue_message (image_path_queue, 'timeout', 'infinite', MessageData)
    
    get_message_param (MessageData, 'key_exists', 'stop_processing', StopProcInfo)
    if (StopProcInfo[0])
        break
    endif
    
    get_message_tuple (MessageData, 'path', image_path)
    test_run (DarkImage, SubImage, golden_img, image_path, image_height, image_width, Mean, nccModel, result)
endwhile

create_message (MessageFinished)
set_message_tuple (MessageFinished, 'thread_finished', 'Processing finished')
enqueue_message (finish_queue, MessageFinished, [], [])

return ()

实测下来提升吞吐量 3-4 倍。

参考资料

本文参与?腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023年3月9日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 消息队列
  • Halcon 实现
    • 创建消息队列
      • 创建消息
        • 设置消息内容
          • 添加消息进队列
            • 从队列中取走消息
              • 获取消息内容
                • 获取队列参数
                • Halcon 生产者消费者示例
                • 参考资料
                相关产品与服务
                消息队列 CMQ 版
                消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
                http://www.vxiaotou.com