Talk about CanalAdapterService of canal

order

This paper focuses on the CanalAdapterService of canal

CanalAdapterService

canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.java

@Component
@RefreshScope
public class CanalAdapterService {

    private static final Logger logger  = LoggerFactory.getLogger(CanalAdapterService.class);

    private CanalAdapterLoader  adapterLoader;

    @Resource
    private ContextRefresher    contextRefresher;

    @Resource
    private AdapterCanalConfig  adapterCanalConfig;
    @Resource
    private Environment         env;

    // Injection bean ensures priority registration
    @Resource
    private SpringContext       springContext;
    @Resource
    private SyncSwitch          syncSwitch;

    private volatile boolean    running = false;

    @PostConstruct
    public synchronized void init() {
        if (running) {
            return;
        }
        try {
            logger.info("## start the canal client adapters.");
            adapterLoader = new CanalAdapterLoader(adapterCanalConfig);
            adapterLoader.init();
            running = true;
            logger.info("## the canal client adapters are running now ......");
        } catch (Exception e) {
            logger.error("## something goes wrong when starting up the canal client adapters:", e);
        }
    }

    @PreDestroy
    public synchronized void destroy() {
        if (!running) {
            return;
        }
        try {
            running = false;
            logger.info("## stop the canal client adapters");

            if (adapterLoader != null) {
                adapterLoader.destroy();
                adapterLoader = null;
            }
            for (DruidDataSource druidDataSource : DatasourceConfig.DATA_SOURCES.values()) {
                try {
                    druidDataSource.close();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
            DatasourceConfig.DATA_SOURCES.clear();
        } catch (Throwable e) {
            logger.warn("## something goes wrong when stopping canal client adapters:", e);
        } finally {
            logger.info("## canal client adapters are down.");
        }
    }
}
  • CanalAdapterService provides init and destroy methods; its init method will use adapterCanalConfig to create CanalAdapterLoader, and then execute adapterLoader.init(); its destroy method will execute adapterLoader.destroy(), and then traverse DatasourceConfig.DATA_SOURCES to execute druidDataSource.close(), and finally clear DatasourceConfig.DATA_SOURCES

CanalAdapterLoader

canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

public class CanalAdapterLoader {

    private static final Logger                     logger        = LoggerFactory.getLogger(CanalAdapterLoader.class);

    private CanalClientConfig                       canalClientConfig;

    private Map<String, CanalAdapterWorker>         canalWorkers  = new HashMap<>();

    private Map<String, AbstractCanalAdapterWorker> canalMQWorker = new HashMap<>();

    private ExtensionLoader<OuterAdapter>           loader;

    public CanalAdapterLoader(CanalClientConfig canalClientConfig){
        this.canalClientConfig = canalClientConfig;
    }

    /**
     * Initialize canal client
     */
    public void init() {
        loader = ExtensionLoader.getExtensionLoader(OuterAdapter.class);

        String canalServerHost = this.canalClientConfig.getCanalServerHost();
        SocketAddress sa = null;
        if (canalServerHost != null) {
            String[] ipPort = canalServerHost.split(":");
            sa = new InetSocketAddress(ipPort[0], Integer.parseInt(ipPort[1]));
        }
        String zkHosts = this.canalClientConfig.getZookeeperHosts();

        if ("tcp".equalsIgnoreCase(canalClientConfig.getMode())) {
            // Initializing the adapter for the canal client
            for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {
                List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();

                for (CanalClientConfig.Group connectorGroup : canalAdapter.getGroups()) {
                    List<OuterAdapter> canalOutConnectors = new ArrayList<>();
                    for (OuterAdapterConfig c : connectorGroup.getOuterAdapters()) {
                        loadAdapter(c, canalOutConnectors);
                    }
                    canalOuterAdapterGroups.add(canalOutConnectors);
                }
                CanalAdapterWorker worker;
                if (sa != null) {
                    worker = new CanalAdapterWorker(canalClientConfig,
                        canalAdapter.getInstance(),
                        sa,
                        canalOuterAdapterGroups);
                } else if (zkHosts != null) {
                    worker = new CanalAdapterWorker(canalClientConfig,
                        canalAdapter.getInstance(),
                        zkHosts,
                        canalOuterAdapterGroups);
                } else {
                    throw new RuntimeException("No canal server connector found");
                }
                canalWorkers.put(canalAdapter.getInstance(), worker);
                worker.start();
                logger.info("Start adapter for canal instance: {} succeed", canalAdapter.getInstance());
            }
        } else if ("kafka".equalsIgnoreCase(canalClientConfig.getMode())) {
            // Initializing the adapter for canal client Kafka
            for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {
                for (CanalClientConfig.Group group : canalAdapter.getGroups()) {
                    List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
                    List<OuterAdapter> canalOuterAdapters = new ArrayList<>();
                    for (OuterAdapterConfig config : group.getOuterAdapters()) {
                        loadAdapter(config, canalOuterAdapters);
                    }
                    canalOuterAdapterGroups.add(canalOuterAdapters);

                    CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(canalClientConfig,
                        canalClientConfig.getMqServers(),
                        canalAdapter.getInstance(),
                        group.getGroupId(),
                        canalOuterAdapterGroups,
                        canalClientConfig.getFlatMessage());
                    canalMQWorker.put(canalAdapter.getInstance() + "-kafka-" + group.getGroupId(), canalKafkaWorker);
                    canalKafkaWorker.start();
                    logger.info("Start adapter for canal-client mq topic: {} succeed",
                        canalAdapter.getInstance() + "-" + group.getGroupId());
                }
            }
        } else if ("rocketMQ".equalsIgnoreCase(canalClientConfig.getMode())) {
            // Initializing the adapter for canal client rocketmq
            for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {
                for (CanalClientConfig.Group group : canalAdapter.getGroups()) {
                    List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
                    List<OuterAdapter> canalOuterAdapters = new ArrayList<>();
                    for (OuterAdapterConfig config : group.getOuterAdapters()) {
                        loadAdapter(config, canalOuterAdapters);
                    }
                    canalOuterAdapterGroups.add(canalOuterAdapters);
                    CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig,
                        canalClientConfig.getMqServers(),
                        canalAdapter.getInstance(),
                        group.getGroupId(),
                        canalOuterAdapterGroups,
                        canalClientConfig.getAccessKey(),
                        canalClientConfig.getSecretKey(),
                        canalClientConfig.getFlatMessage(),
                        canalClientConfig.isEnableMessageTrace(),
                        canalClientConfig.getCustomizedTraceTopic(),
                        canalClientConfig.getAccessChannel(),
                        canalClientConfig.getNamespace());
                    canalMQWorker.put(canalAdapter.getInstance() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
                    rocketMQWorker.start();

                    logger.info("Start adapter for canal-client mq topic: {} succeed",
                        canalAdapter.getInstance() + "-" + group.getGroupId());
                }
            }
        }
    }

    private void loadAdapter(OuterAdapterConfig config, List<OuterAdapter> canalOutConnectors) {
        try {
            OuterAdapter adapter;
            adapter = loader.getExtension(config.getName(), StringUtils.trimToEmpty(config.getKey()));

            ClassLoader cl = Thread.currentThread().getContextClassLoader();
            // Replace ClassLoader
            Thread.currentThread().setContextClassLoader(adapter.getClass().getClassLoader());
            Environment env = (Environment) SpringContext.getBean(Environment.class);
            Properties evnProperties = null;
            if (env instanceof StandardEnvironment) {
                evnProperties = new Properties();
                for (PropertySource<?> propertySource : ((StandardEnvironment) env).getPropertySources()) {
                    if (propertySource instanceof EnumerablePropertySource) {
                        String[] names = ((EnumerablePropertySource<?>) propertySource).getPropertyNames();
                        for (String name : names) {
                            Object val = propertySource.getProperty(name);
                            if (val != null) {
                                evnProperties.put(name, val);
                            }
                        }
                    }
                }
            }
            adapter.init(config, evnProperties);
            Thread.currentThread().setContextClassLoader(cl);
            canalOutConnectors.add(adapter);
            logger.info("Load canal adapter: {} succeed", config.getName());
        } catch (Exception e) {
            logger.error("Load canal adapter: {} failed", config.getName(), e);
        }
    }

    /**
     * Destroy all adapters to prevent too many cannal instances from blocking the destruction, and destroy them in parallel
     */
    public void destroy() {
        if (!canalWorkers.isEmpty()) {
            ExecutorService stopExecutorService = Executors.newFixedThreadPool(canalWorkers.size());
            for (CanalAdapterWorker canalAdapterWorker : canalWorkers.values()) {
                stopExecutorService.execute(canalAdapterWorker::stop);
            }
            stopExecutorService.shutdown();
            try {
                while (!stopExecutorService.awaitTermination(1, TimeUnit.SECONDS)) {
                    // ignore
                }
            } catch (InterruptedException e) {
                // ignore
            }
        }

        if (!canalMQWorker.isEmpty()) {
            ExecutorService stopMQWorkerService = Executors.newFixedThreadPool(canalMQWorker.size());
            for (AbstractCanalAdapterWorker canalAdapterMQWorker : canalMQWorker.values()) {
                stopMQWorkerService.execute(canalAdapterMQWorker::stop);
            }
            stopMQWorkerService.shutdown();
            try {
                while (!stopMQWorkerService.awaitTermination(1, TimeUnit.SECONDS)) {
                    // ignore
                }
            } catch (InterruptedException e) {
                // ignore
            }
        }
        logger.info("All canal adapters destroyed");
    }
}
  • The init method of the CanalAdapterLoader obtains the loader of the OuterAdapter through the ExtensionLoader.getExtensionLoader(OuterAdapter.class), and then performs different initialization according to different canalClientConfig.getMode(). There are three modes: tcp, kafka and rocketMQ. The tcp mode creates the CanalAdapterWorker, and then adds it to the canalWorkers, and then executes the worker.start method; the kafka mode Create CanalAdapterKafkaWorker, and then execute the worker.start method after adding it to canalMQWorker; rocketMQ creates CanalAdapterRocketMQWorker, and then execute the worker.start method after adding it to canalMQWorker; its destroy method will traverse canalWorkers.values() and canalMQWorker.values(), and submit asynchronous thread to execute its stop method

Summary

CanalAdapterService provides init and destroy methods; its init method will use adapterCanalConfig to create CanalAdapterLoader, and then execute adapterLoader.init(); its destroy method will execute adapterLoader.destroy(), and then traverse DatasourceConfig.DATA_SOURCES to execute druidDataSource.close(), and finally clear DatasourceConfig.DATA_SOURCES

doc

Tags: Programming kafka Java

Posted on Sun, 05 Apr 2020 21:06:06 -0700 by dagee