Tamanho do fetch de cursor no PostgreSQL (EclipseLink JPA)
O cursor é um método conveniente para gerenciar a quantidade de memória usada por um conjunto de resultados em qualquer momento. No caso do driver JDBC para PostgreSQL, requisitos adicionais devem ser atendidos para que o conjunto de dados inteiro não seja buscado, apesar do tamanho de paginação definido. Se ignorarmos isso, nossa aplicação pode consumir alguns GB de dados a mais do que pretendíamos.
Conjunto de resultados do cursor
As condições mais importantes são que o cursor deve ser configurado como TYPE_FORWARD_ONLY e a conexão não deve estar no modo de auto-commit. Considerando a própria interface JDBC, não há problemas em atender a esses requisitos, mas no caso do JPA, é mais complicado. Vamos ver o que pode acontecer ao usar o EclipseLink.
import org.eclipse.persistence.config.HintValues;
import org.eclipse.persistence.config.QueryHints;
import org.eclipse.persistence.config.ResultSetType;
import org.eclipse.persistence.queries.Cursor;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.PersistenceContext;
import javax.persistence.PersistenceUnit;
import javax.persistence.Query;
public class ForwardCursorTest {
@PersistenceContext
private EntityManager entityManager;
private Cursor getInternalPgResultSet() {
Query usersQuery = entityManager.createQuery("SELECT u from User u")
.setHint(QueryHints.RESULT_SET_TYPE, ResultSetType.ForwardOnly)
.setHint(QueryHints.SCROLLABLE_CURSOR, HintValues.TRUE)
.setHint(QueryHints.MAINTAIN_CACHE, HintValues.FALSE)
.setHint(QueryHints.JDBC_FETCH_SIZE, FETCH_SIZE);
return (Cursor) usersQuery.getSingleResult();
}
}
Para obter um cursor típico para uma query JPA, eu uso as hints RESULT_SET_TYPE, SCROLLABLE_CURSOR, MAINTAIN_CACHE e JDBC_FETCH_SIZE. Graças a elas, o EclipseLink construirá o resultado da query como um cursor paginado. Neste ponto, podemos simplesmente converter o resultado para o Iterator, mas para o propósito de testar o estado interno do cursor, usarei a interface de cursor específica do EclipseLink.
Verificação do tamanho do fetch do cursor
Em uma query construída dessa forma, não temos como configurar a propriedade autocommit. A especificação JPA não define
tal interface. O valor desse parâmetro será controlado dependendo da demarcação e do tipo de transação (RESOURCE_LOCAL ou JTA).
Por exemplo, podemos nos surpreender que, para uma transação somente de leitura, nosso cursor buscará o conjunto de dados inteiro:
import org.eclipse.persistence.queries.Cursor;
import org.junit.jupiter.api.Test;
import org.postgresql.jdbc.PgResultSet;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.transaction.annotation.Transactional;
import java.sql.SQLException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@SpringBootTest
public class ForwardCursorTest {
private static final int FETCH_SIZE = 1;
private interface Function<T, R, E extends Throwable> {
R apply(T t) throws E;
}
private static int getConsuming(
Cursor cursor,
Function<PgResultSet, Integer, SQLException> getter
) throws SQLException {
PgResultSet resultSet = cursor.getResultSet().unwrap(PgResultSet.class);
Integer result = getter.apply(resultSet);
cursor.close();
return result;
}
//...
@Test
@Transactional(readOnly = true)
public void testForwardCursorFetchSize_NonTransactional_ReadOnly() throws SQLException {
Cursor cursor = getInternalPgResultSet();
long fetchSize = getConsuming(cursor, PgResultSet::getLastUsedFetchSize);
assertNotEquals(FETCH_SIZE, fetchSize);
assertEquals(
entityManager.createQuery("SELECT COUNT(u) FROM User u").getSingleResult(),
fetchSize
);
}
@Test
public void testForwardCursorFetchSize_NonTransactional() throws SQLException {
Cursor cursor = getInternalPgResultSet();
assertNotEquals(FETCH_SIZE, getConsuming(cursor, PgResultSet::getLastUsedFetchSize));
}
}
Somente após o início real da transação, obteremos o tamanho esperado do cursor. Esse momento pode variar dependendo do
gerenciador de transações. Por padrão, no Spring, isso ocorrerá durante a primeira operação de modificação. Se anexarmos a
configuração EclipseLinkJpaDialect com a flag lazyDatabaseTransaction definida como false, qualquer query em um modo diferente de somente leitura iniciará a transação.
Na ausência de um gerenciador, o início adequado pode ser forçado através do beginEarlyTransaction().
import javax.persistence.EntityManagerFactory;
@SpringBootTest
public class ForwardCursorTest {
//...
@Test
@Transactional
public void testForwardCursorFetchSize_Transactional() throws SQLException {
entityManager.createNativeQuery("set application_name to 'Implicit autocommit disable';")
.executeUpdate();
Cursor cursor = getInternalPgResultSet();
assertEquals(FETCH_SIZE, getConsuming(cursor, PgResultSet::getLastUsedFetchSize));
}
@PersistenceUnit
private EntityManagerFactory entityManagerFactory;
@Test
public void testForwardCursorFetchSize_Transactional_NonManaged() throws SQLException {
entityManager = entityManagerFactory.createEntityManager();
entityManager.getTransaction().begin();
entityManager.unwrap(UnitOfWork.class).beginEarlyTransaction();
Cursor cursor = getInternalPgResultSet();
assertEquals(FETCH_SIZE, getConsuming(cursor, PgResultSet::getLastUsedFetchSize));
entityManager.getTransaction().rollback();
}
}
Em seguida, no caso de transações JEE, a especificação JCA (JBoss / WildFly JDBC) garante que o modo de auto-commit
seja desativado no início da transação (para a fonte de dados transacional). No contexto de uma transação JEE
com o atributo TransactionAttributeType.NEVER, não iniciaremos uma transação JPA nem obteremos uma
conexão usando o método unwrap(). Podemos tentar interceptar a conexão ouvindo eventos de sessão do EclipseLink,
embora a modificação dos atributos não esteja em conformidade com a especificação EJB 3.
Modificação dos parâmetros de conexão
Considerando as especificações acima, eu desaconselharia quebrar a convenção. Vamos tentar atender às condições do controlador de acordo com as tecnologias usadas. No entanto, na ausência de tais restrições, seríamos capazes de adicionar suporte para tal caso de uso a um custo relativamente baixo? A resposta para esta pergunta certamente será a interface SessionEventListener do EclipseLink.
package dev.termian.demo;
import org.eclipse.persistence.config.QueryHints;
import org.eclipse.persistence.config.ResultSetType;
import org.eclipse.persistence.internal.databaseaccess.Accessor;
import org.eclipse.persistence.internal.databaseaccess.DatasourceCall;
import org.eclipse.persistence.internal.jpa.QueryHintsHandler;
import org.eclipse.persistence.internal.sessions.AbstractSession;
import org.eclipse.persistence.queries.DatabaseQuery;
import org.eclipse.persistence.sessions.SessionEvent;
import org.eclipse.persistence.sessions.SessionEventAdapter;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class CursorQueryAutocommitDisabler extends SessionEventAdapter {
private final Set<Accessor> modifiedAccessors =
Collections.newSetFromMap(new ConcurrentHashMap<>());
@Override
public void preExecuteCall(SessionEvent event) { // #1
super.preExecuteCall(event);
DatabaseQuery query = getForwardCursorQuery(event);
if (query != null) {
disableAutocommit(query, query.getSession());
}
}
private void disableAutocommit(DatabaseQuery query, AbstractSession session) {
for (Accessor accessor : query.getAccessors()) { // #3a
accessor.incrementCallCount(session); // #5
Connection connection = accessor.getConnection(); // #3b
try {
if (connection.getAutoCommit()) {
connection.setAutoCommit(false); // #6
modifiedAccessors.add(accessor); // #7
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
@Override
public void postExecuteCall(SessionEvent event) {
super.preExecuteCall(event);
DatabaseQuery query = getForwardCursorQuery(event);
if (query != null) {
for (Accessor accessor : query.getAccessors()) {
accessor.decrementCallCount();
}
}
}
private DatabaseQuery getForwardCursorQuery(SessionEvent event) {
if (!(event.getCall() instanceof DatasourceCall)) {
return null;
}
DatasourceCall call = (DatasourceCall) event.getCall();
if (call.getQuery() == null) {
return null;
}
DatabaseQuery query = call.getQuery(); // #2
//noinspection unchecked
Map<String, Object> hints = (Map<String, Object>) query
.getProperty(QueryHintsHandler.QUERY_HINT_PROPERTY);
if (hints == null || // #4
!ResultSetType.ForwardOnly.equals(hints.get(QueryHints.RESULT_SET_TYPE))) {
return null;
}
return query;
}
@Override
public void preReleaseConnection(SessionEvent event) {
super.preReleaseConnection(event);
Accessor accessor = (Accessor) event.getResult();
if (modifiedAccessors.remove(accessor)) {
Connection connection = accessor.getConnection();
try {
connection.rollback();
connection.setAutoCommit(true); // #8
} catch (SQLException e) {
accessor.setIsValid(false); // #9
}
}
}
}
O método preExecuteCall() (#1) é o momento em que o EclipseLink 2.7+ já inicializou a lista de acessadores (#3a)
através dos quais a conexão com o banco de dados é feita (#3b). Aqui podemos verificar que tipo de query será executada em um momento (#2).
Lidando com uma query de cursor do tipo forward, incrementamos a contagem de conexões (#4). No caso de um pool de conexões externo (por exemplo, JNDI), é aqui que a conexão SQL é recuperada (se não forçada, geralmente logo após). Conexões do pool interno são inicializadas antes de chamar o preExecuteCall.
Então, desativamos o autocommit (#5) e marcamos o acessador como modificado (#6) para que mais tarde possamos restaurar a propriedade anterior da conexão (#7). Eventualmente, antes de retornar ao pool, por exemplo, ao fechar o cursor, os bloqueios compartilhados
são liberados. Em caso de um erro inesperado, invalidamos o acessador (#9), informando ao EclipseLink para fechar a conexão.
Agora podemos aplicar nossa configuração no arquivo persistence.xml:
<?xml version="1.0" encoding="UTF-8"?>
<persistence version="2.1" xmlns="http://xmlns.jcp.org/xml/ns/persistence"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence
http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd">
<persistence-unit name="my-pu">
<!--...-->
<properties>
<!--...-->
<property name="eclipselink.session-event-listener"
value="dev.termian.demo.CursorQueryAutocommitDisabler"/>
</properties>
</persistence-unit>
</persistence>
Alternativamente, a configuração pode ser adicionada temporariamente a uma sessão de servidor compartilhada:
import org.eclipse.persistence.sessions.SessionEventManager;
import org.eclipse.persistence.sessions.server.ServerSession;
public class ForwardCursorTest {
//...
@Test
public void testForwardCursorFetchSize_NonTransactional_AutocommitDisabled()
throws SQLException {
SessionEventManager eventManager = entityManagerFactory.createEntityManager()
.unwrap(ServerSession.class).getEventManager();
CursorQueryAutocommitDisabler queryListener = new CursorQueryAutocommitDisabler();
eventManager.addListener(queryListener);
try {
Cursor cursor = getInternalPgResultSet();
assertEquals(FETCH_SIZE, getConsuming(cursor, PgResultSet::getLastUsedFetchSize));
} finally { // ServerSession is shared by entity managers of the same factory
eventManager.removeListener(queryListener);
}
Cursor cursorOnReusedConnection = getInternalPgResultSet();
assertNotEquals(FETCH_SIZE,
getConsuming(cursorOnReusedConnection, PgResultSet::getLastUsedFetchSize)
);
}
}

