Oracle 高级队列(AQ)
适用对象:初步了解oracle高级队列人群
注意事项:
序号 |
注意事项 |
1 |
JMS监听部分基本参照以下网站内容: http://blog.csdn.net/scorpio3k/article/details/49406209 |
2 |
本文仅为按作者本身的项目经历编写,不包含全部oracle高级队列功能 |
目录
Oracle 高级队列(AQ). 0
一.前言... 3
二.功能概述... 3
三.创建Oracle高级队列... 4
1.Oracle高级队列所需权限... 4
2.创建队列结构—TYPE. 4
3.创建队列表... 5
4.创建队列... 5
5.队列管理... 5
6.队列创建管理完整步骤... 7
四.JMS监听并处理Oracle 高级队列... 10
1. 准备工作:. 10
2. 创建连接参数类:. 10
3. 创建消息转换类:. 10
4. 主类进行消息处理:. 11
1 前言
一般系统的应用可以分为:立即要执行和可以延迟要执行的事情,区分这个很重要。为了提高系统的性能,缩短系统等待时间,引入队列技术。
队列是一种能将应用程序的处理工作有效地划分为前台任务和后台任务的技术。当处理容量允许时,这种技术通过存储消息、确定消息处理的优先顺序和向应用程序提交消息来发挥作用。它使你能够平衡本地计算机的负荷,或将任务分配到远程计算机。
为了减少用户的等待时间,应用程序可以让说明需要后台处理的消息排入队列。然后就可以从页面的呈递过程中去掉该处理任务。由一个后台进程来读取并队列处理这些消息,或者甚至可以交由一个单独的系统来处理它们。
队列可以实现各个系统之间的数据共享,消息通信。
2 功能概述
书写本文的目的:利用Oracle高级队列实现pl/sql代码,为其它语言实现高级队列的功能作接口。
Oracle高级队列有一下好处:
(1)高级队列管理是Oracle数据库的一个特性,它提供消息队列管理功能。这是一个非常可靠、安全和可伸缩的消息管理系统,因为它使用与其他基于Oracle技术的应用程序相同的数据库特性。
(2)高级队列管理的一个很大优点是它可以通过pl/sql、java或c来访问,这样你就可以把来自一个java servlet的消息入队列和使pl/sql存储过程中的相同消息出队列。
(3)高级队列管理的另一个优点是你可以利用这一软件通过Oracle net services (sql*net)、http(s)和smtp,在远程节点之间传播消息。高级队列甚至可以通过消息网关与非Oracle的消息管理系统(如ibm mqseries)相集成。
(4)Oracle高级队列管理提供了单消费者队列和多消费者队列。单消费者队列只面向单一的接收者。多消费者队列可以被多个接收者使用。当把消息放入多消费者队列时,应用程序的程序员必须显式地在消息属性中指定这些接收者,或者建立决定每条消息的接收者的基于规则的订阅过程。
Oracle 高级队列具体开发步骤如下:
(1)首先确定应用的需求,是否适合使用高级队列?使用高级队列预计提高性能的预期值
(2)赋予数据库账户相应aq权限。
(3)确定队列包体结构,即创建type。
(4)创建队列表及队列。
(5)队列管理
3 创建Oracle高级队列
3.1 Oracle高级队列所需权限
赋予权限和角色:
grant connect, resource to账户名;
grant aq_user_role to账户名;
grant aq_administrator_role to 账户名;
grant execute on sys.dbms_aqadm to账户名;
grant execute on sys.dbms_aq to账户名;
grant execute on sys.dbms_aqin to账户名;
grant execute on sys.dbms_aqjms to账户名;
3.2 创建队列结构—TYPE
create or replace type 类型名称 as object
(
字段一 字段类型,
字段二 字段类型,
字段三 字段类型,
字段四 字段类型,
…………………………
字段n 字段类型
);
3.3 创建队列表
begin
sys.dbms_aqadm.create_queue_table
(
queue_table => '队列表名',
queue_payload_type => ' type类型', --之前定义的type类型
sort_list => 'priority,enq_time', --按优先级和入列时间排序
multiple_consumers => false, --多消费者
comment => '自己加注解',
auto_commit => false --手动控制事务
);
end;
3.4 创建队列
begin
sys.dbms_aqadm.create_queue
(
queue_name => '队列名',
queue_table => '队列表名', --之前创建的队列表
queue_type => sys.dbms_aqadm.normal_queue,
max_retries => 3, --取队列失败后重试次数
retry_delay => 1, --重试前等待
retention_time => 0 --取队列后保持时间,不保持
);
end;
3.5 队列管理
3.5.1 启动队列:
begin
sys.dbms_aqadm.start_queue
(queue_name => '队列名',enqueue => true ,dequeue => true );
end;
3.5.2 插入队列:
declare
v_message 队列类型(type);
v_msgid raw(16);
v_options dbms_aq.enqueue_options_t;
v_properties dbms_aq.message_properties_t;
v_recipients dbms_aq.aq$_recipient_list_t;
begin
v_message := task_c(字段一 => 字符串,
字段二 => 字符串,
字段三 => 字符串,
字段四 => 字符串,
………………………
字段n => 字符串);
v_properties.priority :=数字; --该消息的优先级别,默认为1
v_options.visibility := dbms_aq.immediate; --立即入列
dbms_aq.enqueue(queue_name => '队列名',
enqueue_options => v_options,
message_properties => v_properties,
payload => v_message,
msgid => v_msgid);
end;
3.5.3 暂停队列:
begin
sys.dbms_aqadm.stop_queue ( queue_name => '队列名');
end;
3.5.4 删除队列:
begin
sys.dbms_aqadm.drop_queue ( queue_name => '队列名');
end;
3.5.5 删除队列表:
begin
sys.dbms_aqadm.drop_queue_table (queue_table => '队列表名');
end;
3.6 队列创建管理完整步骤
在aquser账户中创建高级队列,高级队列结构为callid,msg_id,report_time,sms_report,respurl,send_times,队列名称为sms_queue。
开发步骤:
(1)赋予权限:
grant connect, resource to aquser;
grant aq_user_role to aquser;
grant aq_administrator_role to aquser;
grant execute on sys.dbms_aqadm to aquser;
grant execute on sys.dbms_aq to aquser;
grant execute on sys.dbms_aqin to aquser;
grant execute on sys.dbms_aqjms to aquser;
(2)创建队列结构(type):
create or replace type sms_queue_type as object
(
callid varchar2(1024),
msg_id varchar2(1024),
report_time varchar2(1024),
sms_report varchar2(1024),
respurl varchar2(1024),
send_times varchar2(1024)
);
(3)创建队列表:
begin
sys.dbms_aqadm.create_queue_table
(
queue_table => 'sms_queue_table',
queue_payload_type => 'sms_queue_type',
sort_list => 'priority,enq_time',
multiple_consumers => false,
comment => 'queue for test',
auto_commit => false
);
end;
(4)创建队列:
begin
sys.dbms_aqadm.create_queue
(
queue_name => 'sms_queue',
queue_table => 'sms_queue_table',
queue_type => sys.dbms_aqadm.normal_queue,
max_retries => 3,
retry_delay => 1,
retention_time => 0
);
end;
(5)启动队列:
begin
sys.dbms_aqadm.start_queue
(queue_name => 'sms_queue',enqueue => true ,dequeue => true );
end;
(6)建立入队存储:
CREATE OR REPLACE procedure sms_enqueue(in_callid varchar2,
in_msg_id varchar2,
in_report_time varchar2,
in_sms_report varchar2,
in_respurl varchar2,
in_send_times number,
out_result out varchar2,
out_sqlerrm out varchar2) as
/*声明变量*/
v_Message SMS_QUEUE_TYPE;
v_MsgId RAW(16);
v_options DBMS_AQ.ENQUEUE_OPTIONS_T;
v_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_Recipients DBMS_AQ.AQ$_RECIPIENT_LIST_T;
v_sqlerrm varchar2(512);
begin
/*对列字段赋值*/
v_Message := SMS_QUEUE_TYPE(callid => in_callid,
msg_id => in_msg_id,
report_time => in_report_time,
sms_report => in_sms_report,
respurl => in_respurl,
send_times => in_send_times);
/*让消息立即进入队列*/
v_options.visibility := DBMS_AQ.IMMEDIATE;
/*入队操作*/
dbms_aq.enqueue(queue_name => 'sms_queue',
enqueue_options => v_options,
message_properties => v_properties,
payload => v_Message,
msgid => v_MsgId);
/*异常处理*/
exception
WHEN OTHERS THEN
v_sqlerrm := SQLERRM;
out_result := '-1';
out_sqlerrm := v_sqlerrm;
end;
(6)删除队列:
begin
sys.dbms_aqadm.stop_queue ( queue_name => 'sms_queue');
end;
begin
sys.dbms_aqadm.drop_queue ( queue_name => 'sms_queue');
end;
begin
sys.dbms_aqadm.drop_queue_table (queue_table => 'sms_queue_table');
end;
drop type voicechange.sms_queue_type;
4 JMS监听并处理Oracle 高级队列
4.1 准备工作:
Java使用JMS进行相应的处理,需要使用Oracle提供的jar,在Oracle安装目录可以找到:在linux中可以使用find命令进行查找,例如:find `pwd` -name 'jmscommon.jar'
需要的jar为:
app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/jmscommon.jar
app/oracle/product/12.1.0/dbhome_1/jdbc/lib/ojdbc7.jar
app/oracle/product/12.1.0/dbhome_1/jlib/orai18n.jar
app/oracle/product/12.1.0/dbhome_1/jlib/jta.jar
app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/aqapi_g.jar
4.2 创建连接参数类:
实际使用时可以把参数信息配置在properties文件中,使用spring进行注入。
package org.kevin.jms;
c class JmsConfig
{
public String username = "数据库账户名";
public String password = "数据库账户密码";
public String jdbcUrl = "jdbc:oracle:thin:@数据库TNS:端口号:名称";
public String queueName = "监听的队列名称";
}
4.3 创建消息转换类:
package org.kevin.jms;
import java.sql.SQLException;
import oracle.jdbc.driver.OracleConnection;
import oracle.jdbc.internal.OracleTypes;
import oracle.jpub.runtime.MutableStruct;
import oracle.sql.CustomDatum;
import oracle.sql.CustomDatumFactory;
import oracle.sql.Datum;
import oracle.sql.STRUCT;
@SuppressWarnings("deprecation")
public class QUEUE_MESSAGE_TYPE implements CustomDatum, CustomDatumFactory {
public static final String _SQL_NAME = "QUEUE_MESSAGE_TYPE";
public static final int _SQL_TYPECODE = OracleTypes.STRUCT;
MutableStruct _struct;
// 12表示字符串
static int[] _sqlType = { 12 };
static CustomDatumFactory[] _factory = new CustomDatumFactory[1];
static final QUEUE_MESSAGE_TYPE _MessageFactory = new QUEUE_MESSAGE_TYPE();
public static CustomDatumFactory getFactory() {
return _MessageFactory;
}
public QUEUE_MESSAGE_TYPE() {
_struct = new MutableStruct(new Object[1], _sqlType, _factory);
}
public Datum toDatum(OracleConnection c) throws SQLException {
return _struct.toDatum(c, _SQL_NAME);
}
public CustomDatum create(Datum d, int sqlType) throws SQLException {
if (d == null)
return null;
QUEUE_MESSAGE_TYPE o = new QUEUE_MESSAGE_TYPE();
o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory);
return o;
}
public String getContent() throws SQLException {
return (String) _struct.getAttribute(0);
}
}
4.4 主类进行消息处理:
package org.kevin.jms;
import java.util.Properties;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import oracle.jms.AQjmsAdtMessage;
import oracle.jms.AQjmsDestination;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
public class Main {
public static void main(String[] args) throws Exception {
JmsConfig config = new JmsConfig();
QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(config.jdbcUrl,
new Properties());
QueueConnection conn = queueConnectionFactory.createQueueConnection(config.username, config.password);
AQjmsSession session = (AQjmsSession) conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
Queue queue = (AQjmsDestination) session.getQueue(config.username, config.queueName);
MessageConsumer consumer = session.createConsumer(queue, null, QUEUE_MESSAGE_TYPE.getFactory(), null, false);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println("ok");
AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message;
try {
QUEUE_MESSAGE_TYPE payload = (QUEUE_MESSAGE_TYPE) adtMessage.getAdtPayload();
System.out.println(payload.getContent());
} catch (Exception e) {
e.printStackTrace();
}
}
});
Thread.sleep(1000000);
}
}