这期内容当中小编将会给大家带来有关如何用源码分析canal的deployer模块,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
CanalLauncher是启动入口类
获取canal.properties配置文件
如果canal.properties配置文件中属性root.admin.manager有值,那么构造PlainCanalConfigClient,调用PlainCanalConfigClient的findServer获取PlainCanal,调用PlainCanal的getProperties方法获取properties
通过properties构造 CanalStarter并调用其start方法
CanalStarter是启动类
public synchronized void start() throws Throwable {
//首先根据canal.serverMode构造CanalMQProducer,如果是kafka,构造的是CanalKafkaProducer
String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
if (serverMode.equalsIgnoreCase("kafka")) {
canalMQProducer = new CanalKafkaProducer();
} else if (serverMode.equalsIgnoreCase("rocketmq")) {
canalMQProducer = new CanalRocketMQProducer();
}
if (canalMQProducer != null) {
// disable netty
System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
// 设置为raw避免ByteString->Entry的二次解析
System.setProperty("canal.instance.memory.rawEntry", "false");
}
//接下来构造CanalController并调用其start方法
logger.info("## start the canal server.");
controller = new CanalController(properties);
controller.start();
logger.info("## the canal server is running now ......");
...
//构造CanalMQStarter并调用其start方法,同时设置为CanalController的属性
if (canalMQProducer != null) {
canalMQStarter = new CanalMQStarter(canalMQProducer);
MQProperties mqProperties = buildMQProperties(properties);
String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
canalMQStarter.start(mqProperties, destinations);
controller.setCanalMQStarter(canalMQStarter);
}
...
running = true;
}
CanalController是实例调度控制器
public CanalController(final Properties properties){
// 初始化managerClients用于请求admin
managerClients = MigrateMap.makeComputingMap(new Function<String, PlainCanalConfigClient>() {
public PlainCanalConfigClient apply(String managerAddress) {
return getManagerClient(managerAddress);
}
});
// 初始化全局参数设置,包含了全局mode、lazy、managerAddress、springXml,初始化instanceGenerator用于创建instance,其根据InstanceConfig的mode值使用PlainCanalInstanceGenerator或者SpringCanalInstanceGenerator创建CanalInstance
globalInstanceConfig = initGlobalConfig(properties);
instanceConfigs = new MapMaker().makeMap();
// 初始化instance config,包含了实例mode、lazy、managerAddress、springXml
initInstanceConfig(properties);
...
// 初始化CanalServerWithEmbedded,将instanceGenerator设置为CanalServerWithEmbedded的属性
embededCanalServer = CanalServerWithEmbedded.instance();
embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112"));
embededCanalServer.setMetricsPort(metricsPort);
this.adminUser = getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
this.adminPasswd = getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
embededCanalServer.setUser(getProperty(properties, CanalConstants.CANAL_USER));
embededCanalServer.setPasswd(getProperty(properties, CanalConstants.CANAL_PASSWD));
...
final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
//初始化ZkClientx用于canal集群部署,创建/otteradmin/canal/destinations节点和/otteradmin/canal/cluster节点
if (StringUtils.isNotEmpty(zkServers)) {
zkclientx = ZkClientx.getZkClient(zkServers);
// 初始化系统目录
zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);
zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
}
// 初始化ServerRunningMonitors的ServerRunningMonitor,用于启动、关闭实例
final ServerRunningData serverData = new ServerRunningData(registerIp + ":" + port);
ServerRunningMonitors.setServerData(serverData);
ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {
...
}));
// 初始化InstanceAction,用于启动和关闭实例
autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
if (autoScan) {
defaultAction = new InstanceAction() {
...
};
// 初始化instanceConfigMonitors,用于获取所有instanceConfig并启动所有instance
instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {
public InstanceConfigMonitor apply(InstanceMode mode) {
...
}
});
}
}
ManagerInstanceConfigMonitor是实例扫描器
public void start() {
super.start();
//启动定时任务,定时扫描所有instance
executor.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
scan();
if (isFirst) {
isFirst = false;
}
} catch (Throwable e) {
logger.error("scan failed", e);
}
}
}, 0, scanIntervalInSecond, TimeUnit.SECONDS);
}
private void scan() {
//缓存了所有instance的配置,如果发现有新的instance则启动或者修改了instance则重启
String instances = configClient.findInstances(null);
final List<String> is = Lists.newArrayList(StringUtils.split(instances, ','));
List<String> start = Lists.newArrayList();
List<String> stop = Lists.newArrayList();
List<String> restart = Lists.newArrayList();
for (String instance : is) {
if (!configs.containsKey(instance)) {
PlainCanal newPlainCanal = configClient.findInstance(instance, null);
if (newPlainCanal != null) {
configs.put(instance, newPlainCanal);
start.add(instance);
}
} else {
PlainCanal plainCanal = configs.get(instance);
PlainCanal newPlainCanal = configClient.findInstance(instance, plainCanal.getMd5());
if (newPlainCanal != null) {
// 配置有变化
restart.add(instance);
configs.put(instance, newPlainCanal);
}
}
}
configs.forEach((instance, plainCanal) -> {
if (!is.contains(instance)) {
stop.add(instance);
}
});
stop.forEach(instance -> {
notifyStop(instance);
});
restart.forEach(instance -> {
notifyReload(instance);
});
start.forEach(instance -> {
notifyStart(instance);
});
}
private void notifyStart(String destination) {
try {
//启动instance调用InstanceAction启动实例,最后是调用ServerRunningMonitor启动实例
defaultAction.start(destination);
actions.put(destination, defaultAction);
// 启动成功后记录配置文件信息
} catch (Throwable e) {
logger.error(String.format("scan add found[%s] but start failed", destination), e);
}
}
ServerRunningMonitor是针对server的running实例控制
public ServerRunningMonitor(){
// 创建父节点
dataListener = new IZkDataListener() {
public void handleDataChange(String dataPath, Object data) throws Exception {
MDC.put("destination", destination);
ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
if (!isMine(runningData.getAddress())) {
mutex.set(false);
}
if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
releaseRunning();// 彻底释放mainstem
}
activeData = (ServerRunningData) runningData;
}
public void handleDataDeleted(String dataPath) throws Exception {
MDC.put("destination", destination);
mutex.set(false);
if (!release && activeData != null && isMine(activeData.getAddress())) {
// 如果上一次active的状态就是本机,则即时触发一下active抢占
initRunning();
} else {
// 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作
delayExector.schedule(new Runnable() {
public void run() {
initRunning();
}
}, delayTime, TimeUnit.SECONDS);
}
}
};
}
public synchronized void start() {
super.start();
try {
//首先调用listener的processStart方法
processStart();
if (zkClient != null) {
// 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
// 监视/otteradmin/canal/destinations/{0}/running节点变化
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
zkClient.subscribeDataChanges(path, dataListener);
initRunning();
} else {
processActiveEnter();// 没有zk,直接启动
}
} catch (Exception e) {
logger.error("start failed", e);
// 没有正常启动,重置一下状态,避免干扰下一次start
stop();
}
}
private void processStart() {
if (listener != null) {
try {
//processStart方法中创建/otteradmin/canal/destinations/{0}/cluster/{1}节点,0是实例名称,1是当前节点ip:port
listener.processStart();
} catch (Exception e) {
logger.error("processStart failed", e);
}
}
}
private void initRunning() {
if (!isStart()) {
return;
}
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
// 序列化
byte[] bytes = JsonUtils.marshalToByte(serverData);
try {
mutex.set(false);
//尝试创建/otteradmin/canal/destinations/{0}/running节点
zkClient.create(path, bytes, CreateMode.EPHEMERAL);
activeData = serverData;
//如果成功则调用listener的processEnter方法,processEnter方法中调用CanalServerWithEmbedded的start方法启动实例和CanalMQStarter的start方法启动实例
processActiveEnter();// 触发一下事件
mutex.set(true);
release = false;
} catch (ZkNodeExistsException e) {
bytes = zkClient.readData(path, true);
if (bytes == null) {// 如果不存在节点,立即尝试一次
initRunning();
} else {
activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
}
} catch (ZkNoNodeException e) {
zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点
initRunning();
}
}
canal.properties配置
canal.register.ip =
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
canal.admin.register.auto = true
canal.admin.register.cluster =
上述就是小编为大家分享的如何用源码分析canal的deployer模块了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/3769440/blog/4758303