Talk about getTransactionRows of binlogconnector replicator

order

This paper focuses on the getTransactionRows of binlogconnector replicator

BinlogConnectorReplicator

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java

public class BinlogConnectorReplicator extends RunLoopProcess implements Replicator {

	//......

	private RowMapBuffer getTransactionRows(BinlogConnectorEvent beginEvent) throws Exception {
		BinlogConnectorEvent event;
		RowMapBuffer buffer = new RowMapBuffer(MAX_TX_ELEMENTS, this.bufferMemoryUsage);

		String currentQuery = null;

		while ( true ) {
			event = pollEvent();

			if (event == null) {
				ensureReplicatorThread();
				continue;
			}

			EventType eventType = event.getEvent().getHeader().getEventType();
			if (event.isCommitEvent()) {
				if (!buffer.isEmpty()) {
					buffer.getLast().setTXCommit();
					long timeSpent = buffer.getLast().getTimestampMillis() - beginEvent.getEvent().getHeader().getTimestamp();
					transactionExecutionTime.update(timeSpent);
					transactionRowCount.update(buffer.size());
				}
				if(eventType == EventType.XID) {
					buffer.setXid(event.xidData().getXid());
				}
				return buffer;
			}

			switch(eventType) {
				case WRITE_ROWS:
				case UPDATE_ROWS:
				case DELETE_ROWS:
				case EXT_WRITE_ROWS:
				case EXT_UPDATE_ROWS:
				case EXT_DELETE_ROWS:
					Table table = tableCache.getTable(event.getTableID());

					if ( table != null && shouldOutputEvent(table.getDatabase(), table.getName(), filter, table.getColumnNames()) ) {
						for ( RowMap r : event.jsonMaps(table, getLastHeartbeatRead(), currentQuery) )
							if (shouldOutputRowMap(table.getDatabase(), table.getName(), r, filter)) {
								buffer.add(r);
							}
					}
					currentQuery = null;
					break;
				case TABLE_MAP:
					TableMapEventData data = event.tableMapData();
					tableCache.processEvent(getSchema(), this.filter, data.getTableId(), data.getDatabase(), data.getTable());
					break;
				case ROWS_QUERY:
					RowsQueryEventData rqed = event.getEvent().getData();
					currentQuery = rqed.getQuery();
					break;
				case QUERY:
					QueryEventData qe = event.queryData();
					String sql = qe.getSql();
					String upperCaseSql = sql.toUpperCase();

					if ( upperCaseSql.startsWith(BinlogConnectorEvent.SAVEPOINT)) {
						LOGGER.debug("Ignoring SAVEPOINT in transaction: " + qe);
					} else if ( createTablePattern.matcher(sql).find() ) {
						// CREATE TABLE `foo` SELECT * FROM `bar` will put a CREATE TABLE
						// inside a transaction.  Note that this could, in rare cases, lead
						// to us starting on a WRITE_ROWS event -- we sync the schema position somewhere
						// kinda unsafe.
						processQueryEvent(event);
					} else if (upperCaseSql.startsWith("INSERT INTO MYSQL.RDS_") || upperCaseSql.startsWith("DELETE FROM MYSQL.RDS_")) {
						// RDS heartbeat events take the following form:
						// INSERT INTO mysql.rds_heartbeat2(id, value) values (1,1483041015005) ON DUPLICATE KEY UPDATE value = 1483041015005

						// Other RDS internal events like below:
						// INSERT INTO mysql.rds_sysinfo(name, value) values ('innodb_txn_key','Thu Nov 15 10:30:07 UTC 2018')
						// DELETE FROM mysql.rds_sysinfo where name = 'innodb_txn_key'

						// We don't need to process them, just ignore
					} else if (upperCaseSql.startsWith("DROP TEMPORARY TABLE")) {
						// Ignore temporary table drop statements inside transactions
					} else {
						LOGGER.warn("Unhandled QueryEvent @ {} inside transaction: {}", event.getPosition().fullPosition(), qe);
					}
					break;
			}
		}
	}


	//......

}
  • The getTransactionRows method of BinlogConnectorReplicator first creates a RowMapBuffer, and then the while loop executes pollEvent, and returns RowMapBuffer when event.isCommitEvent(); otherwise, it will be processed differently according to the eventType. For the shouldOutputEvent and shouldOutputRowMap, the RowMap will be put into the RowMapBuffer; for the table map, it will be processed through tableCache.processEvent ; for QUERY, process through processQueryEvent

ListWithDiskBuffer

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/util/ListWithDiskBuffer.java

public class ListWithDiskBuffer<T> {
	static final Logger LOGGER = LoggerFactory.getLogger(ListWithDiskBuffer.class);
	private final long maxInMemoryElements;
	private final LinkedList<T> list;
	private long elementsInFile = 0;
	private File file;
	private ObjectInputStream is;
	private ObjectOutputStream os;

	public ListWithDiskBuffer(long maxInMemoryElements) {
		this.maxInMemoryElements = maxInMemoryElements;
		list = new LinkedList<>();
	}

	public void add(T element) throws IOException {
		list.add(element);

		while ( shouldBuffer() )
			evict();
	}

	protected boolean shouldBuffer() {
		return this.list.size() > maxInMemoryElements;
	}

	protected void resetOutputStreamCaches() throws IOException {
		os.reset();
	}

	public void flushToDisk() throws IOException {
		if ( os != null )
			os.flush();
	}

	public boolean isEmpty() {
		return this.size() == 0;
	}

	public T getLast() {
		return list.getLast();
	}

	public T removeFirst(Class<T> clazz) throws IOException, ClassNotFoundException {
		if ( elementsInFile > 0 ) {
			if ( is == null ) {
				os.flush();
				is = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)));
			}

			Object object = is.readObject();
			T element = clazz.cast(object);
			elementsInFile--;

			return element;
		} else {
			return list.removeFirst();
		}
	}

	public Long size() {
		return list.size() + elementsInFile;
	}

	public Long inMemorySize() {
		return Long.valueOf(list.size());
	}

	@Override
	protected void finalize() throws Throwable {
		try {
			if ( file != null )
				file.delete();
		} finally {
			super.finalize();
		}
	}

	protected T evict() throws IOException {
		if ( file == null ) {
			file = File.createTempFile("maxwell", "events");
			file.deleteOnExit();
			os = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file)));
		}

		if ( elementsInFile == 0 ) {
			LOGGER.info("Overflowed in-memory buffer, spilling over into " + file);
		}
		
		T evicted = this.list.removeFirst();
		os.writeObject(evicted);

		elementsInFile++;

		if ( elementsInFile % maxInMemoryElements == 0 )
			resetOutputStreamCaches();

		return evicted;
	}

}
  • When ListWithDiskBuffer is add ed, it will judge whether the list size is larger than maxInMemoryElements, and if it is larger, it will execute evict method; evict method will execute list.removeFirst(), and then write it to the file through os.writeObject(evicted), increment elementsInFile, and execute resetOutputStreamCaches when elementsInFile takes the remainder of maxInMemoryElements to 0; its removeFirst method will execute elem Read the element from the file when entsinfile is greater than 0, otherwise it will be returned directly through list. removeFirst()

RowMapBuffer

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/row/RowMapBuffer.java

public class RowMapBuffer extends ListWithDiskBuffer<RowMap> {
	private static long FlushOutputStreamBytes = 10000000;
	private Long xid;
	private Long xoffset = 0L;
	private Long serverId;
	private Long threadId;
	private Long schemaId;
	private long memorySize = 0;
	private long outputStreamCacheSize = 0;
	private final long maxMemory;

	public RowMapBuffer(long maxInMemoryElements) {
		super(maxInMemoryElements);
		this.maxMemory = (long) (Runtime.getRuntime().maxMemory() * 0.25);
	}

	public RowMapBuffer(long maxInMemoryElements, long maxMemory) {
		super(maxInMemoryElements);
		this.maxMemory = maxMemory;
	}

	public RowMapBuffer(long maxInMemoryElements, float bufferMemoryUsage) {
		super(maxInMemoryElements);
		this.maxMemory = (long) (Runtime.getRuntime().maxMemory() * bufferMemoryUsage);
	}

	@Override
	public void add(RowMap rowMap) throws IOException {
		this.memorySize += rowMap.getApproximateSize();
		super.add(rowMap);
	}

	@Override
	protected boolean shouldBuffer() {
		return memorySize > maxMemory;
	}

	@Override
	protected RowMap evict() throws IOException {
		RowMap r = super.evict();
		this.memorySize -= r.getApproximateSize();

		/* For performance reasons, the output stream will hold on to cached objects.
		 * There's probably a smarter thing to do (write our own serdes, maybe?), but
		 * for now we forcibly flush its cache when it gets too big. */
		this.outputStreamCacheSize += r.getApproximateSize();
		if ( this.outputStreamCacheSize > FlushOutputStreamBytes ) {
			resetOutputStreamCaches();
			this.outputStreamCacheSize = 0;
		}

		return r;
	}

	public RowMap removeFirst() throws IOException, ClassNotFoundException {
		RowMap r = super.removeFirst(RowMap.class);
		r.setXid(this.xid);
		r.setXoffset(this.xoffset++);
		r.setServerId(this.serverId);
		r.setThreadId(this.threadId);
		r.setSchemaId(this.schemaId);

		return r;
	}

	public void setXid(Long xid) {
		this.xid = xid;
	}

	public void setServerId(Long serverId) {
		this.serverId = serverId;
	}

	public void setThreadId(Long threadId) {
		this.threadId = threadId;
	}

	public void setSchemaId(Long schemaId) {
		this.schemaId = schemaId;
	}
}
  • RowMapBuffer inherits ListWithDiskBuffer, and its add and evict methods maintain the memorySize; the shouldBuffer method determines whether the memorySize is greater than maxMemory

TableCache

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/replication/TableCache.java

public class TableCache {
	private final String maxwellDB;

	public TableCache(String maxwellDB) {
		this.maxwellDB = maxwellDB;
	}
	private final HashMap<Long, Table> tableMapCache = new HashMap<>();

	public void processEvent(Schema schema, Filter filter, Long tableId, String dbName, String tblName) {
		if ( !tableMapCache.containsKey(tableId)) {
			if ( filter.isTableBlacklisted(dbName, tblName) ) {
				return;
			}

			Database db = schema.findDatabase(dbName);
			if ( db == null )
				throw new RuntimeException("Couldn't find database " + dbName);
			else {
				Table tbl = db.findTable(tblName);

				if (tbl == null)
					throw new RuntimeException("Couldn't find table " + tblName + " in database " + dbName);
				else
					tableMapCache.put(tableId, tbl);
			}
		}

	}

	public Table getTable(Long tableId) {
		return tableMapCache.get(tableId);
	}

	public void clear() {
		tableMapCache.clear();
	}
}
  • TableCache maintains tableId and tableMapCache of Table. When tableMapCache does not contain tableId, its processEvent method finds Database through Schema, finds Table through Database, and puts it into tableMapCache

Summary

The getTransactionRows method of BinlogConnectorReplicator first creates a RowMapBuffer, and then the while loop executes pollEvent, and returns RowMapBuffer when event.isCommitEvent(); otherwise, it will be processed differently according to the eventType. For the shouldOutputEvent and shouldOutputRowMap, the RowMap will be put into the RowMapBuffer; for the table map, it will be processed through tableCache.processEvent ; for QUERY, process through processQueryEvent

doc

Tags: Programming Java MySQL Database SQL

Posted on Sat, 02 May 2020 12:48:51 -0700 by daniel_mintz