Documentation
¶
Overview ¶
Package qwr provides serialised writes and concurrent reads for SQLite databases.
qwr (Query Write Reader) uses a worker pool pattern with a single writer to sequentially queue writes to SQLite while allowing concurrent read operations through a configurable connection pool.
Quick Start ¶
manager, err := qwr.New("app.db").
Reader(profile.ReadBalanced()).
Writer(profile.WriteBalanced()).
Open()
if err != nil {
log.Fatal(err)
}
defer manager.Close()
Write Operations ¶
qwr provides several write modes:
- Direct Write: Bypasses the queue for immediate execution
- Synchronous Write: Uses the worker pool, blocks until complete
- Async Write: Non-blocking, errors captured in error queue
- Batch Write: Groups multiple operations into single transactions
- Transactions: Multi-statement atomic operations
Example writes:
// Direct write (bypasses queue)
result, err := manager.Query("INSERT INTO users (name) VALUES (?)", "Alice").Write()
// Synchronous write (queued, blocks)
result, err := manager.Query("INSERT INTO users (name) VALUES (?)", "Bob").Execute()
// Async write (queued, non-blocking)
jobID, err := manager.Query("INSERT INTO users (name) VALUES (?)", "Charlie").Async()
Read Operations ¶
Reads use the connection pool and can be executed concurrently:
rows, err := manager.Query("SELECT * FROM users").Read()
if err != nil {
log.Fatal(err)
}
defer rows.Close()
Attached Databases ¶
qwr supports SQLite's ATTACH DATABASE for working with multiple database files through a single manager. Attached databases share the main connection pool and write serialiser, enabling cross-database queries and atomic transactions across databases.
Attach databases at construction time via the builder:
manager, err := qwr.New("main.db").
Reader(profile.ReadBalanced()).
Writer(profile.WriteBalanced()).
Attach("analytics", "analytics.db", profile.Attached().
WithJournalMode(profile.JournalWal)).
Open()
Or at runtime via the manager:
manager.Attach("logs", "logs.db")
manager.ResetReaderPool() // force immediate reader access
Queries reference attached databases using the schema-qualified syntax:
// Cross-database query
rows, _ := manager.Query("SELECT u.name FROM users u JOIN analytics.events e ON ...").Read()
// Write to attached database
manager.Query("INSERT INTO analytics.events (action) VALUES (?)", "login").Execute()
Always use schema-qualified table names for attached databases (e.g. analytics.events, not just events). Unqualified names resolve to the main database.
Bare :memory: paths are rejected because each pooled connection would get its own isolated in-memory database. Use file::memory:?cache=shared for a shared in-memory attached database.
Do not use [Query.Prepared] for schema-qualified queries before the schema is attached - the preparation will fail on every call until the schema exists.
For parallel writes to independent databases, use separate Manager instances rather than ATTACH. Attached databases share a single serialised writer, which is correct for cross-database transactions but does not offer write parallelism.
Attach is not supported with NewSQL because qwr cannot control connection creation for user-provided database handles.
Observing Events ¶
qwr emits structured events for all significant operations. Subscribe to receive events for logging, metrics, tracing, or alerting:
manager.Subscribe(func(e qwr.Event) {
fmt.Printf("event: %d at %v\n", e.Type, e.Timestamp)
})
Use filters to receive only specific event types:
manager.SubscribeFiltered(func(e qwr.Event) {
fmt.Printf("error: %v\n", e.Err)
}, qwr.ErrorEvents())
Profiles ¶
Database profiles configure connection pools and SQLite PRAGMA settings. Pre-configured profiles are available in the profile subpackage:
- profile.ReadLight(), ReadBalanced(), ReadHeavy()
- profile.WriteLight(), WriteBalanced(), WriteHeavy()
- profile.Attached() for per-schema PRAGMAs on attached databases
Error Handling ¶
Async operations capture errors in an error queue with automatic retry support for transient failures. Errors are classified by type to determine appropriate retry strategies.
if jobErr, found := manager.ErrorByID(jobID); found {
log.Printf("Job %d failed: %v", jobID, jobErr.Error())
}
Custom SQLite Drivers ¶
qwr uses modernc.org/sqlite by default. Use NewSQL to provide your own database connections with a different driver. Note that NewSQL does not support Attach - manage ATTACH statements on your own connections.
Index ¶
- Variables
- func New(path string, opts ...Options) *qwrBuilder
- func NewSQL(reader, writer *sql.DB, opts ...Options) *qwrBuilder
- func ReleaseQueryBuilder(qb *QueryBuilder)
- type BatchCollector
- type BatchJob
- type BatchResult
- type ErrorCategory
- type ErrorQueue
- func (eq *ErrorQueue) Clear()
- func (eq *ErrorQueue) Close()
- func (eq *ErrorQueue) Count() int
- func (eq *ErrorQueue) Get(jobID int64) (JobError, bool)
- func (eq *ErrorQueue) GetAll() []JobError
- func (eq *ErrorQueue) GetReadyForRetry(now time.Time, maxRetries int) []JobError
- func (eq *ErrorQueue) PersistError(jobErr JobError, reason string) error
- func (eq *ErrorQueue) Remove(jobID int64) bool
- func (eq *ErrorQueue) Store(jobErr JobError)
- type Event
- type EventBus
- type EventFilter
- type EventHandler
- type EventType
- type Job
- type JobError
- func (je JobError) Age() time.Duration
- func (je *JobError) CalculateNextRetry(baseDelay time.Duration)
- func (je JobError) CreateRetryQuery() Query
- func (je JobError) ID() int64
- func (je JobError) SQL() (string, []any)
- func (je JobError) ShouldRetry(maxRetries int) bool
- func (je JobError) String() string
- type JobResult
- type JobType
- type Manager
- func (m *Manager) Attach(alias, path string, p ...*profile.Profile) error
- func (m *Manager) Backup(dest string, method backup.Method) error
- func (m *Manager) Batch(job Job) error
- func (m *Manager) ClearErrors()
- func (m *Manager) Close() error
- func (m *Manager) Database() string
- func (m *Manager) ErrorByID(jobID int64) (JobError, bool)
- func (m *Manager) Errors() []JobError
- func (m *Manager) JobStatus(jobID int64) (string, error)
- func (m *Manager) Query(sql string, args ...any) *QueryBuilder
- func (m *Manager) ReaderProfile() *profile.Profile
- func (m *Manager) RemoveError(jobID int64) bool
- func (m *Manager) ResetCaches()
- func (m *Manager) ResetReaderPool() error
- func (m *Manager) RetryJob(jobID int64) error
- func (m *Manager) RunCheckpoint(mode checkpoint.Mode, schema ...string) error
- func (m *Manager) RunIncrementalVacuum(pages int, schema ...string) error
- func (m *Manager) RunVacuum(schema ...string) error
- func (m *Manager) SetSecureDelete(enabled bool) error
- func (m *Manager) Subscribe(handler EventHandler) uint64
- func (m *Manager) SubscribeFiltered(handler EventHandler, filter EventFilter) uint64
- func (m *Manager) Transaction(capacity ...int) *Transaction
- func (m *Manager) TransactionFunc(fn func(*sql.Tx) (any, error)) *TransactionFunc
- func (m *Manager) Unsubscribe(id uint64)
- func (m *Manager) WaitForIdle(ctx context.Context) error
- func (m *Manager) WriterProfile() *profile.Profile
- type Options
- type QWRError
- type Query
- type QueryBuilder
- func (qb *QueryBuilder) Async() (int64, error)
- func (qb *QueryBuilder) Batch() (int64, error)
- func (qb *QueryBuilder) Execute() (*QueryResult, error)
- func (qb *QueryBuilder) GenID() *QueryBuilder
- func (qb *QueryBuilder) Prepared() *QueryBuilder
- func (qb *QueryBuilder) Read() (*sql.Rows, error)
- func (qb *QueryBuilder) ReadClose(fn func(*sql.Rows) error) error
- func (qb *QueryBuilder) ReadRow() (*sql.Row, error)
- func (qb *QueryBuilder) Release()
- func (qb *QueryBuilder) WithContext(ctx context.Context) *QueryBuilder
- func (qb *QueryBuilder) Write() (*QueryResult, error)
- type QueryResult
- type ResultType
- type RetryStrategy
- type StmtCache
- type Transaction
- func (t *Transaction) Add(sql string, args ...any) *Transaction
- func (t *Transaction) AddPrepared(sql string, args ...any) *Transaction
- func (t *Transaction) Exec() (*TransactionResult, error)
- func (t *Transaction) ExecuteWithContext(ctx context.Context, db *sql.DB) JobResult
- func (t *Transaction) ID() int64
- func (t *Transaction) WithContext(ctx context.Context) *Transaction
- func (t *Transaction) Write() (*TransactionResult, error)
- type TransactionFunc
- func (tf *TransactionFunc) Exec() (*TransactionFuncResult, error)
- func (tf *TransactionFunc) ExecuteWithContext(ctx context.Context, db *sql.DB) JobResult
- func (tf *TransactionFunc) ID() int64
- func (tf *TransactionFunc) WithContext(ctx context.Context) *TransactionFunc
- func (tf *TransactionFunc) Write() (*TransactionFuncResult, error)
- type TransactionFuncResult
- type TransactionResult
- type WriteSerialiser
- func (wp *WriteSerialiser) QueueLen() int
- func (wp *WriteSerialiser) Start(ctx context.Context)
- func (wp *WriteSerialiser) Stop() error
- func (wp *WriteSerialiser) SubmitNoWait(ctx context.Context, job Job) (int64, error)
- func (wp *WriteSerialiser) SubmitNoWaitNoContext(job Job) (int64, error)
- func (wp *WriteSerialiser) SubmitWait(ctx context.Context, job Job) (JobResult, error)
- func (wp *WriteSerialiser) SubmitWaitNoContext(job Job) (JobResult, error)
Constants ¶
This section is empty.
Variables ¶
var ( ErrAttachReservedAlias = errors.New("alias is a reserved SQLite schema name") ErrAttachInvalidAlias = errors.New("alias must be a valid SQLite identifier (letters, digits, underscores)") ErrAttachEmptyAlias = errors.New("alias cannot be empty") ErrAttachEmptyPath = errors.New("path cannot be empty") ErrAttachDuplicateAlias = errors.New("alias is already attached") ErrAttachMemoryPath = errors.New(":memory: databases are per-connection and cannot be shared across the pool - use file::memory:?cache=shared instead") ErrAttachNotSupported = errors.New("Attach is not supported with NewSQL - manage ATTACH statements on your own connections") )
var ( ErrManagerClosed = errors.New("manager is closed") ErrReaderDisabled = errors.New("reader is disabled") ErrWriterDisabled = errors.New("writer is disabled") ErrResultNotFound = errors.New("result not found") ErrInvalidResult = errors.New("invalid result type") ErrQueryTooLarge = errors.New("query exceeds maximum allowed size") ErrStatementCacheFull = errors.New("statement cache is full") ErrHashCollision = errors.New("statement hash collision") ErrErrorQueueDisabled = errors.New("error queue is disabled") ErrJobNotFound = errors.New("job not found in error queue") ErrWorkerNotRunning = errors.New("worker pool is not running") ErrQueueTimeout = errors.New("timeout waiting for queue to accept submission") ErrRetrySubmissionFailed = errors.New("failed to resubmit job for retry") ErrInvalidQuery = errors.New("query is invalid") ErrNilPreparedStatement = errors.New("internal error: prepared statement is nil before execution") ErrNilPreparedStatementCache = errors.New("internal error: global prepared statement cache returned nil statement without error") ErrFailedToPrepareStatement = errors.New("failed to prepare statement") ErrPreparedCacheRequired = errors.New("prepared statement cache is required when using prepared queries") ErrCacheClosed = errors.New("statement cache is closed") ErrBatchContainsNonQuery = errors.New("batch jobs can only contain Query jobs, not Transaction or nested Batch jobs") ErrConnectionUnhealthy = errors.New("database connection is unhealthy") ErrBackupDestinationExists = errors.New("backup destination already exists") ErrBackupDriverUnsupported = errors.New("driver does not support backup API") ErrBackupFailed = errors.New("backup failed") ErrBackupInit = errors.New("failed to initialize backup") ErrBackupStep = errors.New("backup step failed") ErrBackupConnection = errors.New("failed to get connection for backup") ErrBackupInvalidMethod = errors.New("unknown backup method") ErrQueueFull = errors.New("worker queue is full") )
Error constants
var DefaultOptions = Options{ WorkerQueueDepth: 1000, EnableReader: true, EnableWriter: true, BatchSize: 200, BatchTimeout: 1 * time.Second, InlineInserts: false, UseContexts: false, StmtCacheMaxSize: 1000, ErrorQueueMaxSize: 1000, EnableAutoRetry: false, MaxRetries: 3, BaseRetryDelay: 30 * time.Second, JobTimeout: 30 * time.Second, TransactionTimeout: 30 * time.Second, RetrySubmitTimeout: 5 * time.Second, QueueSubmitTimeout: 5 * time.Minute, }
DefaultOptions provides sensible defaults suitable for most applications. WorkerQueueDepth is set conservatively at 1000 (not 50000) to avoid excessive memory use by default - raise it for high-throughput workloads.
Functions ¶
func New ¶
New creates a new qwr Manager instance builder
Options are immutable after construction. They can only be set here during manager creation and cannot be modified at runtime. If no options are provided, DefaultOptions will be used.
To change options, you must stop the application, create a new manager with different options, and restart.
func NewSQL ¶
NewSQL creates a new qwr Manager instance builder using user-provided database connections. This allows you to bring your own SQLite driver (e.g., mattn/go-sqlite3 instead of modernc.org/sqlite).
Parameters:
- reader: Database connection for read operations (pass nil to disable reader)
- writer: Database connection for write operations (pass nil to disable writer)
- opts: Optional configuration options (variadic). If not provided, DefaultOptions will be used.
Important notes:
- Passing nil for reader/writer automatically disables that connection (sets EnableReader/EnableWriter to false)
- The Manager takes full ownership of the provided database connections
- Calling Manager.Close() will close these database connections
- You should not use these connections directly after passing them to NewSQL()
- Profiles will be applied to your connections (including SQLite PRAGMAs)
- If you provide a non-SQLite database, PRAGMA errors are your responsibility
- Use WithErrorLogPath() to enable persistent error logging to disk
- Attach() is not supported with NewSQL - manage ATTACH statements on your own connections
Example with mattn/go-sqlite3:
import _ "github.com/mattn/go-sqlite3"
readerDB, _ := sql.Open("sqlite3", "mydb.db")
writerDB, _ := sql.Open("sqlite3", "mydb.db")
opts := qwr.Options{ErrorLogPath: "errors.db"} // Optional error logging
manager, err := qwr.NewSQL(readerDB, writerDB, opts).
Reader(profile.ReadBalanced()).
Writer(profile.WriteBalanced()).
Open()
func ReleaseQueryBuilder ¶
func ReleaseQueryBuilder(qb *QueryBuilder)
ReleaseQueryBuilder returns a QueryBuilder to the pool after cleaning it
Types ¶
type BatchCollector ¶
type BatchCollector struct {
// contains filtered or unexported fields
}
BatchCollector manages automatic batching of database jobs for asynchronous execution.
func NewBatchCollector ¶
func NewBatchCollector(ctx context.Context, ws *WriteSerialiser, events *EventBus, options Options, dbPath string) *BatchCollector
NewBatchCollector creates a new batch collector with pre-allocated capacity and context.
func (*BatchCollector) Add ¶
func (bc *BatchCollector) Add(job Job) error
Add adds a job to the current batch for eventual execution using the collector's context. Returns ErrQueueFull if a size-triggered flush cannot submit to the worker queue.
func (*BatchCollector) Close ¶
func (bc *BatchCollector) Close() error
Close flushes any pending batch and stops the timer. Returns ErrQueueFull if the final batch cannot be submitted.
type BatchJob ¶
type BatchJob struct {
Queries []Job
// contains filtered or unexported fields
}
BatchJob represents a collection of database jobs to be executed as a batch
func (BatchJob) ExecuteWithContext ¶
ExecuteWithContext runs each job in the batch within a single transaction. If a statement cache is available, prepared queries reuse cached statements via tx.Stmt() to avoid re-parsing SQL on every execution.
type BatchResult ¶
type BatchResult struct {
Results []JobResult
// contains filtered or unexported fields
}
BatchResult represents the outcome of a batch execution
func (*BatchResult) Duration ¶
func (r *BatchResult) Duration() time.Duration
GetDuration returns how long the batch took to execute
func (*BatchResult) Error ¶
func (r *BatchResult) Error() error
GetError returns any error that occurred during execution
func (*BatchResult) ID ¶
func (r *BatchResult) ID() int64
GetID returns the ID of the batch that produced this result
type ErrorCategory ¶
type ErrorCategory int
ErrorCategory provides granular error classification for better handling
const ( // ErrorCategoryConnection indicates connection-related errors ErrorCategoryConnection ErrorCategory = iota // ErrorCategoryLock indicates database locking/concurrency errors ErrorCategoryLock // ErrorCategoryConstraint indicates constraint violation errors ErrorCategoryConstraint // ErrorCategorySchema indicates schema-related errors ErrorCategorySchema // ErrorCategoryResource indicates resource exhaustion errors ErrorCategoryResource // ErrorCategoryTimeout indicates timeout/deadline errors ErrorCategoryTimeout // ErrorCategoryPermission indicates access control errors ErrorCategoryPermission // ErrorCategoryInternal indicates internal QWR errors ErrorCategoryInternal // ErrorCategoryUnknown indicates unclassified errors ErrorCategoryUnknown )
func (ErrorCategory) String ¶
func (ec ErrorCategory) String() string
String returns the string representation of ErrorCategory
type ErrorQueue ¶
type ErrorQueue struct {
// contains filtered or unexported fields
}
ErrorQueue maintains a simple registry of failed async operations
func NewErrorQueue ¶
func NewErrorQueue(events *EventBus, opts Options, dbPath string) *ErrorQueue
NewErrorQueue creates a new error queue
func (*ErrorQueue) Count ¶
func (eq *ErrorQueue) Count() int
Count returns the number of errors in the queue
func (*ErrorQueue) Get ¶
func (eq *ErrorQueue) Get(jobID int64) (JobError, bool)
Get retrieves a specific error by job ID
func (*ErrorQueue) GetAll ¶
func (eq *ErrorQueue) GetAll() []JobError
GetAll returns all errors in the queue in chronological order
func (*ErrorQueue) GetReadyForRetry ¶
func (eq *ErrorQueue) GetReadyForRetry(now time.Time, maxRetries int) []JobError
GetReadyForRetry returns errors that are ready for retry
func (*ErrorQueue) PersistError ¶
func (eq *ErrorQueue) PersistError(jobErr JobError, reason string) error
PersistError logs an error to the database
func (*ErrorQueue) Remove ¶
func (eq *ErrorQueue) Remove(jobID int64) bool
Remove removes an error from the queue
func (*ErrorQueue) Store ¶
func (eq *ErrorQueue) Store(jobErr JobError)
Store adds or overwrites an error in the queue, or immediately persists non-retriable errors
type Event ¶ added in v0.2.0
type Event struct {
// Type identifies the kind of event. All events have a Type and Timestamp.
Type EventType
Timestamp time.Time
// --- Job context ---
// Set by: Job lifecycle, Direct write, and Retry events.
JobID int64
JobType JobType
SQL string
// --- Timing ---
// Set by: EventJobStarted (QueueWait only), EventJobCompleted, EventJobFailed,
// EventDirectWriteCompleted, EventDirectWriteFailed.
QueueWait time.Duration
ExecTime time.Duration
// --- Error context ---
// Err is set by all "Failed" and error-related events.
Err error
// EvictedCount is set by EventErrorQueueOverflow: the number of oldest
// entries removed to bring the queue back within its max size.
EvictedCount int
// --- Retry context ---
// Set by: EventJobCompleted, EventJobFailed (query jobs only),
// EventRetryScheduled, EventRetryStarted, EventRetryExhausted.
// Attempt is the zero-based retry count: 0 means the first execution,
// 1 means the first retry, etc.
Attempt int
NextRetry time.Time
// --- Batch context ---
// Set by: Batch lifecycle events.
BatchID int64
BatchSize int
BatchReason string // "size_limit", "timeout", "close"
OriginalCount int
CombinedCount int
// --- Cache context ---
// Set by: EventCacheHit, EventCacheMiss, EventCachePrepError.
CacheQuery string
CachePrepTime time.Duration
// --- Backup/maintenance context ---
// Set by: Backup and Checkpoint events.
BackupMethod string
BackupDest string
CheckpointMode string // "PASSIVE", "FULL", "RESTART", "TRUNCATE"
// --- Result ---
// Set by: EventDirectWriteCompleted.
Result sql.Result
}
Event carries data for a single occurrence in the qwr system.
This is a flat struct: all event types share the same fields, and each event type only populates the fields listed in its EventType documentation. Fields not listed for a given event type will be zero-valued. Always check the Type field first to know which other fields are meaningful.
Example handler:
func(e qwr.Event) {
switch e.Type {
case qwr.EventJobCompleted:
log.Printf("job %d completed in %v: %s", e.JobID, e.ExecTime, e.SQL)
case qwr.EventJobFailed:
log.Printf("job %d failed (attempt %d): %v", e.JobID, e.Attempt, e.Err)
}
}
type EventBus ¶ added in v0.2.0
type EventBus struct {
// contains filtered or unexported fields
}
EventBus provides synchronous, in-process event dispatch.
Delivery guarantee: each event is delivered exactly once to every subscriber whose filter matches at the time of the Emit call. Events are never queued, retried, or persisted - if no subscribers are registered, the event is silently discarded. After Close, all Emit calls are no-ops.
Concurrency model:
- Multiple goroutines may call Emit concurrently.
- Each handler is called on the emitting goroutine (not a background worker).
- Subscribe, Unsubscribe, and Emit may all be called concurrently, including from within a handler (see Emit for details).
- After Close, Emit is a no-op. Close is safe to call concurrently with Emit.
func NewEventBus ¶ added in v0.2.0
func NewEventBus() *EventBus
NewEventBus creates a ready-to-use EventBus.
func (*EventBus) Close ¶ added in v0.2.0
func (eb *EventBus) Close()
Close stops all future event delivery. After Close returns, Emit is a no-op. Any Emit calls already in progress will finish delivering to their snapshot of subscribers, but no new Emit calls will proceed.
func (*EventBus) Emit ¶ added in v0.2.0
Emit dispatches an event to all matching subscribers synchronously. Safe to call concurrently. The Timestamp field is set automatically if zero.
After Close, Emit is a no-op. Panics in handlers are recovered and do not propagate to the caller.
Handlers are called outside the lock, so it is safe to call Subscribe or Unsubscribe from within a handler without deadlocking. Changes to the subscriber list take effect on the next Emit call, not the current one.
func (*EventBus) Subscribe ¶ added in v0.2.0
func (eb *EventBus) Subscribe(handler EventHandler) uint64
Subscribe registers a handler that receives all events. Returns a subscription ID that can be passed to Unsubscribe.
func (*EventBus) SubscribeFiltered ¶ added in v0.2.0
func (eb *EventBus) SubscribeFiltered(handler EventHandler, filter EventFilter) uint64
SubscribeFiltered registers a handler that only receives events accepted by filter. If filter is nil the handler receives all events.
func (*EventBus) Unsubscribe ¶ added in v0.2.0
Unsubscribe removes the subscription with the given ID.
type EventFilter ¶ added in v0.2.0
EventFilter returns true if the handler should receive this event type. Use the predefined filters (JobEvents, ErrorEvents, etc.) or provide your own.
func BackupEvents ¶ added in v0.2.0
func BackupEvents() EventFilter
BackupEvents returns a filter matching backup events: EventBackupStarted, EventBackupCompleted, EventBackupFailed, EventBackupFallback.
func BatchEvents ¶ added in v0.2.0
func BatchEvents() EventFilter
BatchEvents returns a filter matching batch lifecycle events: EventBatchQueryAdded, EventBatchFlushed, EventBatchInlineOptimised, EventBatchSubmitted, EventBatchSubmitFailed.
func CacheEvents ¶ added in v0.2.0
func CacheEvents() EventFilter
CacheEvents returns a filter matching statement cache events: EventCacheHit, EventCacheMiss, EventCacheEvicted, EventCachePrepError.
func ErrorEvents ¶ added in v0.2.0
func ErrorEvents() EventFilter
ErrorEvents returns a filter matching all events that indicate something went wrong, across multiple categories. This includes job failures (EventJobFailed, EventDirectWriteFailed) in addition to error queue and retry exhaustion events.
Matches: EventJobFailed, EventDirectWriteFailed, EventReaderQueryFailed, EventRetrySubmitFailed, EventErrorStored, EventErrorPersisted, EventErrorQueueOverflow, EventRetryExhausted.
func JobEvents ¶ added in v0.2.0
func JobEvents() EventFilter
JobEvents returns a filter matching job lifecycle events: EventJobQueued, EventJobStarted, EventJobCompleted, EventJobFailed.
func LifecycleEvents ¶ added in v0.2.0
func LifecycleEvents() EventFilter
LifecycleEvents returns a filter matching manager and worker lifecycle events: EventManagerOpened, EventManagerClosing, EventManagerClosed, EventWorkerStarted, EventWorkerStopped.
func ReadEvents ¶ added in v0.2.1
func ReadEvents() EventFilter
ReadEvents returns a filter matching reader-side events: EventReaderQueryCompleted, EventReaderQueryFailed.
func RetryEvents ¶ added in v0.2.0
func RetryEvents() EventFilter
RetryEvents returns a filter matching retry lifecycle events: EventRetryScheduled, EventRetryStarted, EventRetryExhausted, EventRetrySubmitFailed.
func WriteEvents ¶ added in v0.2.0
func WriteEvents() EventFilter
WriteEvents returns a filter matching all write-related events, both queued (job lifecycle) and direct: EventJobQueued, EventJobStarted, EventJobCompleted, EventJobFailed, EventDirectWriteCompleted, EventDirectWriteFailed.
type EventHandler ¶ added in v0.2.0
type EventHandler func(Event)
EventHandler is a callback that receives events from the EventBus.
Handlers are called synchronously on the goroutine that called Emit. In practice this means handlers run on the write worker goroutine for job events, or on the caller's goroutine for direct operations like RunVacuum.
When multiple handlers are registered, they are called sequentially in subscription order for each event. A slow handler delays all handlers after it for that event AND blocks the caller (e.g., the write worker).
Handlers MUST NOT block. A slow handler stalls the write worker, which prevents all other queued jobs from executing. If you need to do expensive work (network I/O, disk writes, etc.), send the event to a channel and process it in a separate goroutine.
Panics in handlers are recovered by the EventBus and do not propagate.
type EventType ¶ added in v0.2.0
type EventType int
EventType identifies the kind of event emitted by the qwr system.
Event uses a single flat struct rather than per-type structs. This avoids type assertions and interface hierarchies. The trade-off is that most fields are zero-valued for any given event. Each constant below documents exactly which Event fields are populated, so you never need to read the emitting code to know what data is available.
const ( // EventManagerOpened is emitted after Open() completes successfully. EventManagerOpened EventType = iota // EventManagerClosing is emitted at the start of Close(). EventManagerClosing // EventManagerClosed is emitted at the end of Close(), just before // the EventBus itself is closed. This is the last event emitted by // a Manager; no events will follow it. EventManagerClosed // EventJobQueued is emitted when a job enters the worker queue. // Fields: JobID, JobType. EventJobQueued // EventJobStarted is emitted when the worker begins executing a job. // Fields: JobID, JobType, QueueWait. EventJobStarted // EventJobCompleted is emitted when a job finishes successfully. // Fields: JobID, JobType, QueueWait, ExecTime. // Query jobs also set: SQL, Attempt. EventJobCompleted // EventJobFailed is emitted when a job finishes with an error. // Fields: JobID, JobType, QueueWait, ExecTime, Err. // Query jobs also set: SQL, Attempt. EventJobFailed // EventDirectWriteCompleted is emitted after a direct Write() or // Transaction.Write() succeeds. // Fields: JobID, ExecTime. Query writes also set SQL and Result. EventDirectWriteCompleted // EventDirectWriteFailed is emitted after a direct Write() or // Transaction.Write() fails. // Fields: JobID, ExecTime, Err. Query writes also set SQL. EventDirectWriteFailed // EventBatchQueryAdded is emitted when a query is added to the batch. // Fields: BatchSize (current count after adding). EventBatchQueryAdded // EventBatchFlushed is emitted when a batch is flushed for execution. // Fields: BatchID, BatchSize, BatchReason ("size_limit"|"timeout"|"close"). EventBatchFlushed // EventBatchInlineOptimised is emitted when inline INSERT combining // reduces the number of queries in a batch. // Fields: BatchID, OriginalCount, CombinedCount. EventBatchInlineOptimised // EventBatchSubmitted is emitted after a batch is placed on the worker queue. // Fields: BatchID, BatchSize. EventBatchSubmitted // EventBatchSubmitFailed is emitted when a batch cannot be queued. // Fields: BatchID, BatchReason ("worker_not_running"|"queue_full"). EventBatchSubmitFailed // EventErrorStored is emitted when a retriable error is added to the queue. // Fields: JobID, Err. EventErrorStored // EventErrorRemoved is emitted when an error is removed from the queue. // Fields: JobID. EventErrorRemoved // EventErrorQueueCleared is emitted when ClearErrors() is called. // No extra fields. EventErrorQueueCleared // EventErrorQueueOverflow is emitted when the queue exceeds max size // and oldest entries are evicted. // Fields: EvictedCount. EventErrorQueueOverflow // EventErrorPersisted is emitted when an error is written to the // persistent error log database. // Fields: JobID. May also set Err for non-retriable errors. EventErrorPersisted // EventErrorPersistFailed is emitted when writing to the error log // database fails. // Fields: JobID, Err. EventErrorPersistFailed // EventRetryScheduled is emitted when a retry is scheduled after a // retriable failure. // Fields: JobID, Attempt, NextRetry. EventRetryScheduled // EventRetryStarted is emitted when a scheduled retry begins execution. // Fields: JobID, Attempt. EventRetryStarted // EventRetryExhausted is emitted when all retry attempts are used up. // Fields: JobID, Err, Attempt. EventRetryExhausted // EventCacheHit is emitted when a prepared statement is found in cache. // Fields: CacheQuery. EventCacheHit // EventCacheMiss is emitted when a statement is not cached and must be // prepared. Emitted after successful preparation. // Fields: CacheQuery, CachePrepTime. EventCacheMiss // EventCacheEvicted is emitted when a statement is evicted from cache. // No extra fields. EventCacheEvicted // EventCachePrepError is emitted when statement preparation fails. // Fields: CacheQuery, Err. EventCachePrepError // EventBackupStarted is emitted when a backup operation begins. // Fields: BackupMethod ("api"|"vacuum"), BackupDest. EventBackupStarted // EventBackupCompleted is emitted when a backup finishes successfully. // Fields: BackupMethod, BackupDest. EventBackupCompleted // EventBackupFailed is emitted when a backup fails. // Fields: BackupMethod, BackupDest, Err. EventBackupFailed // EventBackupFallback is emitted when the backup API is unsupported // and qwr falls back to VACUUM INTO. // Fields: BackupDest. EventBackupFallback // EventVacuumStarted is emitted before a VACUUM or incremental VACUUM. // No extra fields. EventVacuumStarted // EventVacuumCompleted is emitted after a successful VACUUM. // No extra fields. EventVacuumCompleted // EventVacuumFailed is emitted when a VACUUM fails. // Fields: Err. EventVacuumFailed // EventCheckpointStarted is emitted before a WAL checkpoint. // Fields: CheckpointMode. EventCheckpointStarted // EventCheckpointCompleted is emitted after a successful checkpoint. // Fields: CheckpointMode. EventCheckpointCompleted // EventCheckpointFailed is emitted when a WAL checkpoint fails. // Fields: CheckpointMode, Err. EventCheckpointFailed // EventWorkerStarted is emitted when the write worker goroutine starts. // No extra fields. EventWorkerStarted // EventWorkerStopped is emitted when the write worker goroutine exits. // No extra fields. EventWorkerStopped // EventReaderQueryCompleted is emitted after a synchronous Read(), ReadRow(), // or ReadClose() succeeds. // Fields: ExecTime. Query reads also set SQL. EventReaderQueryCompleted // EventReaderQueryFailed is emitted when a reader operation fails. // Fields: ExecTime, Err. Query reads also set SQL. EventReaderQueryFailed // EventRetrySubmitFailed is emitted when a background retry cannot be // placed on the worker queue. // Fields: JobID, Err. EventRetrySubmitFailed )
type Job ¶
type Job struct {
Type JobType
Query Query
Transaction Transaction
BatchJob BatchJob
TransactionFunc TransactionFunc
}
Job represents a database job that can be executed
func NewTransactionFuncJob ¶ added in v0.2.2
func NewTransactionFuncJob(tf TransactionFunc) Job
NewTransactionFuncJob creates a Job from a TransactionFunc
func NewTransactionJob ¶
func NewTransactionJob(t Transaction) Job
NewTransactionJob creates a Job from a Transaction
func (Job) ExecuteWithContext ¶
ExecuteWithContext runs the job against the database with context
type JobError ¶
type JobError struct {
Query Query // The query that failed
// contains filtered or unexported fields
}
JobError represents an error that occurred during async job execution
func (*JobError) CalculateNextRetry ¶
CalculateNextRetry calculates when this job should be retried next Uses exponential backoff with jitter to prevent thundering herd
func (JobError) CreateRetryQuery ¶
CreateRetryQuery creates a new query for retry with incremented retry count
func (JobError) ShouldRetry ¶
ShouldRetry determines if this error should be retried based on error type and retry count
type JobResult ¶
type JobResult struct {
Type ResultType
QueryResult QueryResult
TransactionResult TransactionResult
BatchResult BatchResult
TransactionFuncResult TransactionFuncResult
}
JobResult represents the outcome of a job execution
func NewBatchResult ¶
func NewBatchResult(br BatchResult) JobResult
NewBatchResult creates a JobResult from a BatchResult
func NewQueryResult ¶
func NewQueryResult(qr QueryResult) JobResult
NewQueryResult creates a JobResult from a QueryResult
func NewTransactionFuncResult ¶ added in v0.2.2
func NewTransactionFuncResult(tfr TransactionFuncResult) JobResult
NewTransactionFuncResult creates a JobResult from a TransactionFuncResult
func NewTransactionResult ¶
func NewTransactionResult(tr TransactionResult) JobResult
NewTransactionResult creates a JobResult from a TransactionResult
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager handles serialised database operations
func (*Manager) Attach ¶ added in v0.3.0
Attach attaches a database at runtime. The ATTACH statement runs immediately on the writer connection. The reader connector is updated so new reader connections get the attachment automatically as the pool recycles them. Call ResetReaderPool to force immediate reader access to the attached database.
An optional profile configures per-schema PRAGMAs for the attached database. Only PRAGMA settings are used - pool parameters are ignored.
Not supported for managers created with NewSQL.
func (*Manager) Backup ¶
Backup creates a backup of the database to the specified destination path.
Available methods:
- backup.Default: Uses backup API if available, falls back to Vacuum
- backup.API: Uses SQLite's online backup API (less locking)
- backup.Vacuum: Uses VACUUM INTO (creates optimized copy)
The destination file must not already exist.
func (*Manager) ClearErrors ¶
func (m *Manager) ClearErrors()
ClearErrors removes all errors from the queue
func (*Manager) Close ¶
Close closes all database connections and stops the worker pool. Safe to call multiple times - subsequent calls return the same result.
func (*Manager) JobStatus ¶ added in v0.3.0
JobStatus checks if a job failed by looking in the error queue
func (*Manager) Query ¶
func (m *Manager) Query(sql string, args ...any) *QueryBuilder
Query creates a new query with the given SQL and arguments
func (*Manager) ReaderProfile ¶ added in v0.3.0
ReaderProfile returns the current reader profile
func (*Manager) RemoveError ¶
RemoveError removes a specific error from the queue
func (*Manager) ResetCaches ¶
func (m *Manager) ResetCaches()
ResetCaches clears all cached prepared statements, freeing memory. The cache remains usable - new statements will be prepared on demand.
func (*Manager) ResetReaderPool ¶ added in v0.3.0
ResetReaderPool creates a new reader pool using the same connector, then drains and closes the old one. New connections will include any attachments added via Manager.Attach since the pool was last created.
In-flight queries that have already started (rows being iterated) will complete before the old resources are released. However, a read that grabs the old pool pointer just before the swap may see a transient "database is closed" error. This is benign and safe to retry.
func (*Manager) RunCheckpoint ¶
func (m *Manager) RunCheckpoint(mode checkpoint.Mode, schema ...string) error
RunCheckpoint triggers a WAL checkpoint on the main database or an attached database if a schema name is provided.
func (*Manager) RunIncrementalVacuum ¶
RunIncrementalVacuum performs incremental vacuum on the main database or an attached database if a schema name is provided.
func (*Manager) RunVacuum ¶
RunVacuum performs a VACUUM operation on the main database or an attached database if a schema name is provided.
func (*Manager) SetSecureDelete ¶
SetSecureDelete enables or disables secure_delete
func (*Manager) Subscribe ¶ added in v0.2.0
func (m *Manager) Subscribe(handler EventHandler) uint64
Subscribe registers an event handler that receives all events. Returns a subscription ID for later removal via Unsubscribe.
func (*Manager) SubscribeFiltered ¶ added in v0.2.0
func (m *Manager) SubscribeFiltered(handler EventHandler, filter EventFilter) uint64
SubscribeFiltered registers an event handler with a filter. The handler only receives events for which filter returns true.
func (*Manager) Transaction ¶
func (m *Manager) Transaction(capacity ...int) *Transaction
Transaction creates a new transaction
func (*Manager) TransactionFunc ¶ added in v0.2.2
TransactionFunc creates a new callback-based transaction. The function receives a *sql.Tx and may perform any combination of reads and writes. Return a non-nil error to trigger rollback; return nil to commit.
func (*Manager) Unsubscribe ¶ added in v0.2.0
Unsubscribe removes a previously registered event handler.
func (*Manager) WaitForIdle ¶
WaitForIdle waits until all operations are processed
func (*Manager) WriterProfile ¶ added in v0.3.0
WriterProfile returns the current writer profile
type Options ¶
type Options struct {
// WorkerQueueDepth sets the buffer size for the serialised write queue.
// A deeper queue absorbs bursts without blocking callers but uses more
// memory. Too shallow and Execute/Async calls block waiting for space.
// Default: 1000.
WorkerQueueDepth int
// EnableReader creates the reader connection pool. Disable for
// write-only applications to avoid opening unused connections.
// Default: true.
EnableReader bool
// EnableWriter creates the writer connection and serialised worker.
// Disable for read-only applications. Without a writer, all write
// methods (Write, Execute, Async, Batch) return ErrWriterDisabled.
// Default: true.
EnableWriter bool
// BatchSize is the number of queries collected before the batch
// collector flushes automatically. Larger batches amortise the
// per-transaction cost across more rows but increase latency for
// individual writes and memory held by the collector.
// Default: 200.
BatchSize int
// BatchTimeout is the maximum time the collector waits before
// flushing a partial batch. Prevents stale writes sitting in the
// buffer when write volume is too low to trigger a size-based flush.
// Default: 1s.
BatchTimeout time.Duration
// InlineInserts enables combining of identically-structured INSERT
// statements within a batch into a single multi-value INSERT. This
// reduces round-trips but relies on simple string parsing - it will
// produce incorrect results for INSERT...SELECT or VALUES clauses
// containing parentheses in string literals.
//
// Only enable when you control the SQL structure and all batched
// inserts follow the pattern INSERT INTO t (...) VALUES (?,...).
// Default: false.
InlineInserts bool
// UseContexts makes all operations use context-aware database methods
// by default. When false, context methods are only used when the caller
// explicitly calls WithContext(). The non-context path avoids the
// overhead of context propagation for applications that don't need
// cancellation or deadlines.
// Default: false.
UseContexts bool
// StmtCacheMaxSize caps the number of prepared statements held in
// cache. Eviction is LRU. Raise this for applications with many
// distinct queries; lower it to reduce memory under constrained
// environments. Each cached statement holds a database-side resource.
// Default: 1000.
StmtCacheMaxSize int
// ErrorQueueMaxSize caps the number of errors retained in memory
// for inspection via ErrorByID. When full, oldest errors are
// evicted. If ErrorLogPath is set, evicted errors are persisted
// to disk first.
// Default: 1000.
ErrorQueueMaxSize int
// EnableAutoRetry automatically resubmits failed async jobs that
// hit retriable errors (SQLITE_BUSY, SQLITE_LOCKED, timeouts).
// Retries use exponential backoff governed by BaseRetryDelay and
// MaxRetries. Disable for applications that need manual control
// over retry logic.
// Default: false.
EnableAutoRetry bool
// MaxRetries is the maximum retry attempts for a failed async job
// before it is marked permanently failed and added to the error
// queue. Only applies when EnableAutoRetry is true.
// Default: 3.
MaxRetries int
// BaseRetryDelay is the base for exponential backoff between retries.
// Actual delay is BaseRetryDelay * 2^(attempt-1) with jitter. Longer
// delays reduce pressure on a contended database but increase the
// time before a transient failure resolves.
// Default: 30s.
BaseRetryDelay time.Duration
// JobTimeout is the deadline applied to individual query execution
// when context is available. Does not apply to transactions (see
// TransactionTimeout).
// Default: 30s.
JobTimeout time.Duration
// TransactionTimeout is the deadline applied to the entire transaction
// lifecycle (begin through commit) when context is available. Should
// be at least as long as JobTimeout since transactions typically
// contain multiple operations.
// Default: 30s.
TransactionTimeout time.Duration
// RetrySubmitTimeout caps how long the retry handler waits to
// resubmit a job to the worker queue. Prevents the retry goroutine
// from blocking indefinitely when the queue is full.
// Default: 5s.
RetrySubmitTimeout time.Duration
// QueueSubmitTimeout caps how long context-free submissions
// (SubmitWaitNoContext, SubmitNoWaitNoContext) wait for queue space.
// Prevents deadlock when the queue is saturated and no context
// cancellation is available.
// Default: 5m.
QueueSubmitTimeout time.Duration
// UsePreparedStatements makes all queries use cached prepared
// statements by default, avoiding repeated SQL parsing. Individual
// queries can still override via Prepared(). Most beneficial when
// the same queries execute repeatedly with different parameters.
// Default: false.
UsePreparedStatements bool
// ErrorLogPath is the file path for persistent error logging. When
// set, errors evicted from the in-memory queue are written to a
// SQLite database at this path for post-mortem inspection. Leave
// empty to keep error logging in-memory only.
// Default: "" (disabled).
ErrorLogPath string
}
Options holds configuration for the qwr manager's internal behaviour.
Options are immutable after manager startup. They can only be set during manager construction via New() and cannot be modified at runtime. To change options, stop the application, create a new manager with different options, and restart.
func (*Options) SetDefaults ¶
func (o *Options) SetDefaults()
SetDefaults applies default values for any option left at its zero value.
type QWRError ¶
type QWRError struct {
// Original error from the underlying operation
Original error
// Category of the error for granular handling
Category ErrorCategory
// RetryStrategy for this specific error
Strategy RetryStrategy
// Context provides additional information about the error
Context map[string]any
// Timestamp when the error occurred
Timestamp time.Time
// Operation that caused the error (query, transaction, etc.)
Operation string
}
QWRError provides structured error information with enhanced classification
func ClassifyError ¶
ClassifyError provides enhanced error classification with detailed categorisation. If the error implements the sqliteErrorCode interface (i.e. exposes a Code() int method), classification uses the structured result code. Otherwise, it falls back to string matching on the error message.
func NewQWRError ¶
func NewQWRError(original error, category ErrorCategory, strategy RetryStrategy, operation string) *QWRError
NewQWRError creates a new structured QWR error
func (*QWRError) IsRetriable ¶
IsRetriable returns true if the error should be retried
type Query ¶
Query represents a database query operation
func (Query) ExecuteWithContext ¶
ExecuteWithContext runs the query against the database with context
type QueryBuilder ¶
type QueryBuilder struct {
// contains filtered or unexported fields
}
QueryBuilder provides a fluent API for building and executing queries
func GetQueryBuilder ¶
func GetQueryBuilder() *QueryBuilder
GetQueryBuilder gets a pre-allocated QueryBuilder object from pool
func (*QueryBuilder) Async ¶
func (qb *QueryBuilder) Async() (int64, error)
Async submits the query to the worker pool for background execution. Returns immediately with a job ID that can be used to check for errors later. Failed async queries are automatically added to the error queue for inspection or retry.
func (*QueryBuilder) Batch ¶
func (qb *QueryBuilder) Batch() (int64, error)
Batch adds the query to a batch for deferred execution
IMPORTANT: Batch operations use the manager's internal context, NOT the context set via WithContext(). This means:
- Query-level contexts (from WithContext()) are ignored for batch operations
- All queries in a batch share the same manager-level context
- Timeouts and cancellation apply to the entire batch, not individual queries
If you need query-specific context control, use Execute() or Async() instead.
func (*QueryBuilder) Execute ¶
func (qb *QueryBuilder) Execute() (*QueryResult, error)
Execute submits the query to the worker pool and waits for completion. Provides queued execution with immediate error feedback. Query will be serialised with other operations but the caller will block until completion.
func (*QueryBuilder) GenID ¶
func (qb *QueryBuilder) GenID() *QueryBuilder
GenID generates a unique ID for this query
func (*QueryBuilder) Prepared ¶
func (qb *QueryBuilder) Prepared() *QueryBuilder
Prepared marks the query to use a prepared statement for execution. Prepared statements are cached and reused, reducing parsing overhead for repeated queries. Most beneficial for queries executed multiple times with different parameters.
func (*QueryBuilder) Read ¶
func (qb *QueryBuilder) Read() (*sql.Rows, error)
Read executes a read operation on the reader connection pool and returns multiple rows. Uses concurrent reader connections for better read performance. Remember to call rows.Close() when finished to prevent connection leaks.
func (*QueryBuilder) ReadClose ¶
func (qb *QueryBuilder) ReadClose(fn func(*sql.Rows) error) error
ReadClose executes a read operation and automatically closes the rows when done. The provided function receives the rows and should iterate/scan them as needed. Rows are automatically closed after the function returns, preventing resource leaks.
This is a convenience method that eliminates the need to manually defer rows.Close(), making it safer and cleaner for typical read operations.
Example:
var users []User
err := mgr.Query("SELECT id, name FROM users").ReadClose(func(rows *sql.Rows) error {
for rows.Next() {
var u User
if err := rows.Scan(&u.ID, &u.Name); err != nil {
return err
}
users = append(users, u)
}
return nil
})
Returns any error from the query execution, the callback function, or row iteration.
func (*QueryBuilder) ReadRow ¶
func (qb *QueryBuilder) ReadRow() (*sql.Row, error)
ReadRow executes a read operation on the reader connection pool and returns a single row. Convenient for queries expected to return exactly one row. Use row.Scan() to extract values. sql.ErrNoRows is returned when no rows match the query.
func (*QueryBuilder) Release ¶
func (qb *QueryBuilder) Release()
Release manually returns the QueryBuilder to the object pool for reuse. Only call this if you don't execute the query (Write/Async/Execute/Read/ReadRow). All execution methods automatically release the QueryBuilder when complete.
func (*QueryBuilder) WithContext ¶
func (qb *QueryBuilder) WithContext(ctx context.Context) *QueryBuilder
WithContext adds a context to the query and enables context usage for this specific query. The context will be used for timeouts, cancellation, and deadlines during query execution. Note: Batch operations ignore query-level contexts and use the manager's internal context.
func (*QueryBuilder) Write ¶
func (qb *QueryBuilder) Write() (*QueryResult, error)
Write executes the query directly on the writer connection, bypassing the worker queue. This provides immediate execution and error feedback but may block the caller. Returns a QueryResult containing SQL result, error, duration, and query ID.
type QueryResult ¶
QueryResult represents the outcome of a query execution
func (*QueryResult) Duration ¶
func (r *QueryResult) Duration() time.Duration
GetDuration returns how long the query took to execute
func (*QueryResult) Error ¶
func (r *QueryResult) Error() error
GetError returns any error that occurred during execution
func (*QueryResult) ID ¶
func (r *QueryResult) ID() int64
GetID returns the ID of the query that produced this result
type ResultType ¶
type ResultType int
const ( ResultTypeQuery ResultType = iota ResultTypeTransaction ResultTypeBatch ResultTypeTransactionFunc )
type RetryStrategy ¶
type RetryStrategy int
RetryStrategy defines how errors should be retried
const ( // RetryStrategyNone indicates no retry should be attempted RetryStrategyNone RetryStrategy = iota // RetryStrategyImmediate indicates immediate retry with no delay RetryStrategyImmediate // RetryStrategyExponential indicates exponential backoff retry RetryStrategyExponential // RetryStrategyLinear indicates linear backoff retry RetryStrategyLinear )
func (RetryStrategy) String ¶
func (rs RetryStrategy) String() string
String returns the string representation of RetryStrategy
type StmtCache ¶
type StmtCache struct {
// contains filtered or unexported fields
}
StmtCache is a high-performance prepared statement cache using ristretto for optimal concurrent access with LRU eviction. Stores SQL prepared statements keyed by their query string.
func NewStmtCache ¶
NewStmtCache creates a new prepared statement cache with LRU eviction. maxSize controls the maximum number of statements to cache (0 = default 1000).
func (*StmtCache) Clear ¶
func (c *StmtCache) Clear()
Clear closes all cached statements and clears the cache. The cache remains usable after Clear() - new statements will be prepared on demand.
type Transaction ¶
type Transaction struct {
// contains filtered or unexported fields
}
Transaction represents multiple SQL statements to execute in a transaction
func (*Transaction) Add ¶
func (t *Transaction) Add(sql string, args ...any) *Transaction
Add adds a query to the transaction
func (*Transaction) AddPrepared ¶
func (t *Transaction) AddPrepared(sql string, args ...any) *Transaction
AddPrepared adds a prepared query to the transaction
func (*Transaction) Exec ¶
func (t *Transaction) Exec() (*TransactionResult, error)
Exec runs the transaction through the worker pool
func (*Transaction) ExecuteWithContext ¶
ExecuteWithContext runs all queries within a single transaction. Used by the write serialiser to dispatch Transaction jobs.
func (*Transaction) WithContext ¶
func (t *Transaction) WithContext(ctx context.Context) *Transaction
WithContext adds context to the transaction
func (*Transaction) Write ¶
func (t *Transaction) Write() (*TransactionResult, error)
Write executes the transaction directly
type TransactionFunc ¶ added in v0.2.2
type TransactionFunc struct {
// contains filtered or unexported fields
}
TransactionFunc executes a caller-provided function within a serialised transaction. The callback receives a *sql.Tx for full read-write access. qwr handles transaction lifecycle (begin, commit on success, rollback on error or panic).
func (*TransactionFunc) Exec ¶ added in v0.2.2
func (tf *TransactionFunc) Exec() (*TransactionFuncResult, error)
Exec runs the transaction through the serialised writer queue. Blocks until the callback completes and the transaction is committed or rolled back.
func (*TransactionFunc) ExecuteWithContext ¶ added in v0.2.2
ExecuteWithContext runs the callback within a transaction on the given db. Used by the write serialiser to dispatch TransactionFunc jobs.
func (*TransactionFunc) ID ¶ added in v0.2.2
func (tf *TransactionFunc) ID() int64
ID returns the unique identifier for this transaction.
func (*TransactionFunc) WithContext ¶ added in v0.2.2
func (tf *TransactionFunc) WithContext(ctx context.Context) *TransactionFunc
WithContext adds context to the transaction. When set, qwr uses BeginTx with a timeout derived from Options.TransactionTimeout.
func (*TransactionFunc) Write ¶ added in v0.2.2
func (tf *TransactionFunc) Write() (*TransactionFuncResult, error)
Write executes the transaction directly on the writer connection, bypassing the queue. The callback still runs within a real transaction.
type TransactionFuncResult ¶ added in v0.2.2
type TransactionFuncResult struct {
// Value is the result returned by the callback on success.
Value any
// contains filtered or unexported fields
}
TransactionFuncResult holds the outcome of a TransactionFunc execution.
func (*TransactionFuncResult) Duration ¶ added in v0.2.2
func (r *TransactionFuncResult) Duration() time.Duration
Duration returns how long the transaction took to execute.
func (*TransactionFuncResult) Error ¶ added in v0.2.2
func (r *TransactionFuncResult) Error() error
Error returns any error from the callback or transaction lifecycle.
func (*TransactionFuncResult) ID ¶ added in v0.2.2
func (r *TransactionFuncResult) ID() int64
ID returns the ID of the transaction that produced this result.
type TransactionResult ¶
type TransactionResult struct {
Results []*QueryResult
// contains filtered or unexported fields
}
TransactionResult represents the outcome of a transaction execution
func (*TransactionResult) Duration ¶
func (r *TransactionResult) Duration() time.Duration
GetDuration returns how long the transaction took to execute
func (*TransactionResult) Error ¶
func (r *TransactionResult) Error() error
GetError returns any error that occurred during execution
func (*TransactionResult) ID ¶
func (r *TransactionResult) ID() int64
GetID returns the ID of the transaction that produced this result
type WriteSerialiser ¶
type WriteSerialiser struct {
// contains filtered or unexported fields
}
WriteSerialiser manages a single worker that processes database jobs
func NewWorkerPool ¶
func NewWorkerPool(db *sql.DB, queueDepth int, events *EventBus, writeCache *StmtCache, options Options) *WriteSerialiser
NewWorkerPool creates a new worker pool for database jobs
func (*WriteSerialiser) QueueLen ¶ added in v0.2.0
func (wp *WriteSerialiser) QueueLen() int
QueueLen returns the current number of items in the work queue.
func (*WriteSerialiser) Start ¶
func (wp *WriteSerialiser) Start(ctx context.Context)
Start begins the worker processing loop
func (*WriteSerialiser) Stop ¶
func (wp *WriteSerialiser) Stop() error
Stop shuts down the worker pool
func (*WriteSerialiser) SubmitNoWait ¶
SubmitNoWait submits a job to the queue without waiting
func (*WriteSerialiser) SubmitNoWaitNoContext ¶
func (wp *WriteSerialiser) SubmitNoWaitNoContext(job Job) (int64, error)
SubmitNoWaitNoContext submits a job without using any context
func (*WriteSerialiser) SubmitWait ¶
SubmitWait submits a job to the queue and waits for its result
func (*WriteSerialiser) SubmitWaitNoContext ¶
func (wp *WriteSerialiser) SubmitWaitNoContext(job Job) (JobResult, error)
SubmitWaitNoContext submits a job without using any context
Source Files
¶
- attach.go
- backup.go
- batch.go
- batch_job.go
- batch_result.go
- connector.go
- doc.go
- dsn_unix.go
- error_queue.go
- errors.go
- event.go
- eventbus.go
- filters.go
- job.go
- options.go
- query.go
- query_builder.go
- query_result.go
- qwr.go
- qwr_builder.go
- statement_cache.go
- transaction.go
- transaction_func.go
- transaction_func_result.go
- transaction_result.go
- write_serialiser.go
Directories
¶
| Path | Synopsis |
|---|---|
|
Package backup defines backup methods for SQLite databases.
|
Package backup defines backup methods for SQLite databases. |
|
Package checkpoint defines WAL checkpoint modes for SQLite.
|
Package checkpoint defines WAL checkpoint modes for SQLite. |
|
Package profile provides pre-configured database profiles for different workload types.
|
Package profile provides pre-configured database profiles for different workload types. |