This class maintains cache of open writers to avoid OS overhead of opening and closing files. While doing so it abides by the same rule as non-pooled writers: there can only be one TableWriter instance for any given table name. This implementation is thread-safe. Writer allocated by one thread c
| 73 | * closed. |
| 74 | */ |
| 75 | public class WriterPool extends AbstractPool { |
| 76 | public static final String OWNERSHIP_REASON_MISSING = "missing or owned by other process"; |
| 77 | public static final String OWNERSHIP_REASON_NONE = null; |
| 78 | public static final String OWNERSHIP_REASON_RELEASED = "released"; |
| 79 | public static final String OWNERSHIP_REASON_UNKNOWN = "unknown"; |
| 80 | static final String OWNERSHIP_REASON_WRITER_ERROR = "writer error"; |
| 81 | private static final long ENTRY_OWNER = Unsafe.getFieldOffset(Entry.class, "owner"); |
| 82 | private static final Log LOG = LogFactory.getLog(WriterPool.class); |
| 83 | private static final long QUEUE_PROCESSING_OWNER = -2L; |
| 84 | private final MicrosecondClock clock; |
| 85 | private final CairoConfiguration configuration; |
| 86 | @NotNull |
| 87 | private final CairoEngine engine; |
| 88 | private final ConcurrentHashMap<Entry> entries = new ConcurrentHashMap<>(); |
| 89 | private final RecentWriteTracker recentWriteTracker; |
| 90 | private final CharSequence root; |
| 91 | |
| 92 | /** |
| 93 | * Pool constructor. WriterPool root directory is passed via configuration. |
| 94 | * |
| 95 | * @param configuration configuration parameters. |
| 96 | * @param engine engine instance. |
| 97 | * @param recentWriteTracker tracker for recent table writes. |
| 98 | */ |
| 99 | public WriterPool(CairoConfiguration configuration, @NotNull CairoEngine engine, @NotNull RecentWriteTracker recentWriteTracker) { |
| 100 | super(configuration, configuration.getInactiveWriterTTL()); |
| 101 | this.configuration = configuration; |
| 102 | this.clock = configuration.getMicrosecondClock(); |
| 103 | this.root = configuration.getDbRoot(); |
| 104 | this.engine = engine; |
| 105 | this.recentWriteTracker = recentWriteTracker; |
| 106 | notifyListener(Thread.currentThread().threadId(), null, PoolListener.EV_POOL_OPEN); |
| 107 | } |
| 108 | |
| 109 | @TestOnly |
| 110 | public int countFreeWriters() { |
| 111 | int count = 0; |
| 112 | for (Entry e : entries.values()) { |
| 113 | final long owner = e.owner; |
| 114 | if (owner == UNALLOCATED) { |
| 115 | count++; |
| 116 | } else { |
| 117 | LOG.info().$("table is still busy [table=").$(e.writer.getTableToken()) |
| 118 | .$(", owner=").$(owner) |
| 119 | .I$(); |
| 120 | } |
| 121 | } |
| 122 | return count; |
| 123 | } |
| 124 | |
| 125 | public Map<CharSequence, Entry> entries() { |
| 126 | return entries; |
| 127 | } |
| 128 | |
| 129 | /** |
| 130 | * <p> |
| 131 | * Creates or retrieves existing TableWriter from pool. Because of TableWriter compliance with <b>single |
| 132 | * writer model</b> pool ensures there is single TableWriter instance for given table name. Table name is unique in |
nothing calls this directly
no test coverage detected
searching dependent graphs…