ConnectionManager
Openfire Connection Manager 是 Openfire 服务器的扩展,它可以透明的处理大规模并发 XMPP 客户端对Openfire 服务器的联接。根据 Connection Manager 主页介绍,每个 Connection Manager 可以处理至少5000个并发用户。
Openfire 服务器可以连接一个或多个 Connection Manager 。每个 Connection Manager 可以处理一个或多个支持 XMPP 的客户端。为 Openfire 配置 Connection Manager 后,支持 XMPP 的客户端仅需要连接 Connection Manager ,由 Connection Manager 负责管理对来自 XMPP 客户端对 Openfire 服务器的连接。下图是Openfire 服务器配置 Connection Manager 后的系统架构。
Ps:第一节是无耻的拷贝。
安装与使用
1.服务器设置:
2.下载功能包,connection_manager_3_6_3.tar.gz,解压至任意目录CM_PATH;
3.设置CM配置文件,CM_PATH/connection_manager/conf/manager.xml,修改各路属性如下:
- domain :域名,如果DNS不支持(本文的实验环境就是这样),添加到/etc/hosts 列表,使填写的值能够ping通,如图本次添加的值为”bfc-27”;
- hostname:在DNS不支持的情况下,写入IP地址,其他情况不写也行;
- 端口号和密码同服务器设置保持一致;
- Manager下面的name属性是显示在服务器上的CM名称,不填写的话会随即生成,注意不能包含下划线;
4.运行CM,CM_PATH/connection_manager/bin/cmanager.sh脚本,可能无运行权限,加好权限即可;
此时刷新服务器链接管理器页面可以看到:
5.登录,在支持XMPP的客户端,填写帐号密码,服务器设置为CM的地址,登录后,刷新服务器链接管理器页面可以看到客户端数量有所增加。
运行原理
1.下面草草的分析下CM的运行原理,既然是使用脚本启动的,那么从脚本开始:
前面一大坨基本是各种环境变量的设置,最后一两句切入主题,lib目录下的startup.jar包,被执行,从jar包目录很容易找到启动类所在,从官网下载源码文件:connection_manager_src_3_6_3.tar.gz;
2.打开源码,在上面的包路径下找到启动类ServerStarter.java,就会发现眼熟的main函数、start函数、解压包操作和类加载操作,最后一步主函数毫不隐讳的加载了核心类ConnectionManager:
ClassLoader loader = new JiveClassLoader(parent, libDir);
Thread.currentThread().setContextClassLoader(loader);
Class containerClass = loader.loadClass(
"org.jivesoftware.multiplexer.ConnectionManager");
containerClass.newInstance();
3.从启动器跳入到ConnectionManager类,从器包含的方法和行为方式来看倒是和openfire中的XMPPServer有几分相似,构造器中调用start函数,start函数基本上做了三件事:
public void start() {
try {
//第一件事初始化
initialize();
// If the server has already been setup then we can start all the server's modules
if (!setupMode) {
// 第二件事初始化并执行了Module
startModules();
}
//第三件事打印启动服务信息
// Log that the server has been started
…
}
catch (Exception e) {
//异常处理
…
}
}
主要操作在第二件事中,CM只有一个表面上看不是Module的Module——ServerSurrogate,
代理服务器,这个类管理了线程池,整理和派发客户端信息与主服务器链接交互。
private void startModules() {
serverSurrogate = new ServerSurrogate();
serverSurrogate.start();
String localIPAddress;
// Setup port info
try {
localIPAddress = InetAddress.getLocalHost().getHostAddress();
}
catch (UnknownHostException e) {
localIPAddress = "Unknown";
}
// Start process that checks health of socket connections
SocketSendingTracker.getInstance().start();
// Check if we need to configure MINA to use Direct or Heap Buffers
// Note: It has been reported that heap buffers are 50% faster than direct buffers
if (!JiveGlobals.getBooleanProperty("xmpp.socket.directBuffer", false)) {
ByteBuffer.setUseDirectBuffers(false);
ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
}
// Start the port listener for clients
startClientListeners(localIPAddress);
// Start the port listener for secured clients
startClientSSLListeners(localIPAddress);
// Start http bind listener
startHttpBindServlet();
}
从代码上可以看到前半段基本是针对ServerSurrogate进行初始化和启动,后半段则是开启各路监听接口,这个玩法又同openfire中的ConnectionManagerImpl相似了,其中值得注意的是客户端监听器startClientListeners :
private void startClientListeners(String localIPAddress) {
…
// Create SocketAcceptor with correct number of processors
socketAcceptor = buildSocketAcceptor();
…
try {
…
// Start accepting connections
socketAcceptor.bind(new InetSocketAddress(bindInterface, port), new ClientConnectionHandler());
…
}
catch (Exception e) {
…
}
}
可以看到openfire中熟悉的SocketAcceptor,绑定IO端口与处理handler的操作,当然CM与openfire同是基于MINA框架的,所以它们的信息处理方式基本一致。
从网络上找到一张CM的信息流程简图,可以大致说明CM的架构与信息流向:
服务器与CM
CM与服务器可以是多对一的关系,并且我们在服务器网页上可以看到服务器上连接的每个CM的信息,即openfire存在获取连接在它上面的CM信息的接口,试着找出它。
在浏览器openfire连接管理页面查询源代码,可以得到:
找到网页对应的jsp代码文件openfire/src/web/connection-managers-settings.jsp ,可以找到如下代码:
<%
ConnectionMultiplexerManager multiplexerManager = ConnectionMultiplexerManager.getInstance();
SessionManager sessionManager = SessionManager.getInstance();
Collection<String> connectionManagers = multiplexerManager.getMultiplexers();
if (connectionManagers.isEmpty()) {
%>
<tr>
<td width="100%" colspan="3" align="center" nowrap><fmt:message key="connection-manager.details.no-managers-connected" />
</td>
</tr>
<% } else {
for (String managerName : connectionManagers) {
List<ConnectionMultiplexerSession> sessions = sessionManager.getConnectionMultiplexerSessions(managerName);
if (sessions.isEmpty()) {
continue;
}
String hostAddress = sessions.get(0).getHostAddress();
String hostName = sessions.get(0).getHostName();
%>
相关类ConnectionMultiplexerManager、SessionManager,与相关方法:
Collection<String> connectionManagers = multiplexerManager.getMultiplexers();
——
/**
* Returns the names of the connected connection managers to this server.
* 获取所有的链接管理器域名集合
* @return the names of the connected connection managers to this server.
*/
public Collection<String> getMultiplexers() {
return sessionsByManager.keySet();
}
/**
* Map that keeps track of connection managers and hosted sessions.
* Key: Domain of connection manager; Value: Map with Key: stream ID; Value: Client *session
*/
private Map<String, Map<String, LocalClientSession>> sessionsByManager =
new ConcurrentHashMap<String, Map<String, LocalClientSession>>();
所以,connectionManagers这个集合像是当前服务器上所有CM对应的域名,那么再看jsp代码中下面取CM信息的for循环代码,就是以域名为参数,进而取所有Session信息的:
for (String managerName : connectionManagers) {
List<ConnectionMultiplexerSession> sessions = sessionManager.getConnectionMultiplexerSessions(managerName);
…
}
那么对于SessionManager,它的getConnectionMultiplexerSessions(String)函数又是如何实现的呢?
/**
* Returns a collection with all the sessions originated from the connection manager
* whose domain matches the specified domain. If there is no connection manager with
* the specified domain then an empty list is going to be returned.
*
* @param domain the domain of the connection manager.
* @return a collection with all the sessions originated from the connection manager
* whose domain matches the specified domain.
*/
public List<ConnectionMultiplexerSession> getConnectionMultiplexerSessions(String domain) {
List<ConnectionMultiplexerSession> sessions = new ArrayList<ConnectionMultiplexerSession>();
// Add sessions of CMs connected to this JVM
//添加JVM里面所有的CM链接对应的Session
for (String address : localSessionManager.getConnnectionManagerSessions().keySet()) {
JID jid = new JID(address);
if (domain.equals(jid.getDomain())) {
sessions.add(localSessionManager.getConnnectionManagerSessions().get(address));
}
}
// Add sessions of CMs connected to other cluster nodes
//添加其他集群节点中的CM链接包含的Session
RemoteSessionLocator locator = server.getRemoteSessionLocator();
if (locator != null) {
for (Map.Entry<String, byte[]> entry : multiplexerSessionsCache.entrySet()) {
if (!server.getNodeID().equals(entry.getValue())) {
JID jid = new JID(entry.getKey());
if (domain.equals(jid.getDomain())) {
sessions.add(
locator.getConnectionMultiplexerSession(entry.getValue(), new JID(entry.getKey())));
}
}
}
}
return sessions;
}
相关的变量定义:
SessionManager:
/**
* Cache (unlimited, never expire) that holds sessions of connection managers. For each
* socket connection of the CM to the server there is going to be an entry in the cache.
* Key: full address of the CM that identifies the socket, Value: nodeID<br>
* 缓存(不受限,永不失效),保持连接管理器的会话。对于每个CM与服务器之间的socket
* 链接,都将会有一个实体保存在缓存中。
* key: 能够识别出socket的CM全地址 value: 节点ID
*/
private Cache<String, byte[]> multiplexerSessionsCache;
LocalSessionManager:
/**
* Map of connection multiplexer sessions grouped by connection managers. Each connection
* manager may have many connections to the server (i.e. connection pool). All connections
* originated from the same connection manager are grouped as a single entry in the map.
* Once all connections have been closed users that were logged using the connection manager
* will become unavailable.
*/
private Map<String, LocalConnectionMultiplexerSession> connnectionManagerSessions =
new ConcurrentHashMap<String, LocalConnectionMultiplexerSession>();
综上可知,以上工具类的内部都记录了CM域名、节点ID、Session的映射关系,也从侧面反映了ConnectionMultiplexerManager系的信息流程处理应当是openfire中处理CM的信息流程。