mirror of
https://github.com/jetzig-framework/jetzig.git
synced 2025-05-14 14:06:08 +00:00
Formalise JetKV interface for store, cache, jobs
Provide configuration for JetKV backend (memory or file allocator).
This commit is contained in:
parent
19da5362cf
commit
b7d750c54f
@ -7,17 +7,17 @@
|
||||
.hash = "1220482f07f2bbaef335f20d6890c15a1e14739950b784232bc69182423520e058a5",
|
||||
},
|
||||
.zmpl = .{
|
||||
.url = "https://github.com/jetzig-framework/zmpl/archive/046c05d376a4fe89d86c52596baa18137891cd87.tar.gz",
|
||||
.hash = "1220d8890b1161e4356d2c59d4b88280566d55480dae840b6f5dae34bf852bef6b56",
|
||||
.url = "https://github.com/jetzig-framework/zmpl/archive/4457eba50bd2eff3743601aa00adbffebcd4207a.tar.gz",
|
||||
.hash = "122001d661e534ef59fc20936330bb0c3068c8aaf1b7c60f9dde9e58d9a536918754",
|
||||
},
|
||||
.jetkv = .{
|
||||
.url = "https://github.com/jetzig-framework/jetkv/archive/edfda9108c857dd5a04f87ae48667d3ed57612d9.tar.gz",
|
||||
.hash = "122015e8a911a7e3a519bbf09c5ef932b569428f6eb6946417df5747a2e76b2a09b0",
|
||||
},
|
||||
.args = .{
|
||||
.url = "https://github.com/MasterQ32/zig-args/archive/01d72b9a0128c474aeeb9019edd48605fa6d95f7.tar.gz",
|
||||
.hash = "12208a1de366740d11de525db7289345949f5fd46527db3f89eecc7bb49b012c0732",
|
||||
},
|
||||
.jetkv = .{
|
||||
.url = "https://github.com/jetzig-framework/jetkv/archive/a6fcc2df220c1a40094e167eeb567bb5888404e9.tar.gz",
|
||||
.hash = "12207bd2d7465b33e745a5b0567172377f94a221d1fc9aab238bb1b372c64f4ec1a0",
|
||||
},
|
||||
.smtp_client = .{
|
||||
.url = "https://github.com/karlseguin/smtp_client.zig/archive/e79e411862d4f4d41657bf41efb884efca3d67dd.tar.gz",
|
||||
.hash = "12209907c69891a38e6923308930ac43bfb40135bc609ea370b5759fc2e1c4f57284",
|
||||
|
24
demo/src/app/views/cache.zig
Normal file
24
demo/src/app/views/cache.zig
Normal file
@ -0,0 +1,24 @@
|
||||
const std = @import("std");
|
||||
const jetzig = @import("jetzig");
|
||||
|
||||
pub fn index(request: *jetzig.Request, data: *jetzig.Data) !jetzig.View {
|
||||
var root = try data.object();
|
||||
try root.put("cached_value", try request.cache.get("example"));
|
||||
|
||||
return request.render(.ok);
|
||||
}
|
||||
|
||||
pub fn post(request: *jetzig.Request, data: *jetzig.Data) !jetzig.View {
|
||||
var root = try data.object();
|
||||
|
||||
const params = try request.params();
|
||||
|
||||
if (params.get("message")) |message| {
|
||||
try request.cache.put("message", message);
|
||||
try root.put("message", message);
|
||||
} else {
|
||||
try root.put("message", data.string("[no message param detected]"));
|
||||
}
|
||||
|
||||
return request.render(.ok);
|
||||
}
|
3
demo/src/app/views/cache/index.zmpl
vendored
Normal file
3
demo/src/app/views/cache/index.zmpl
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
<div>
|
||||
<span>Cached value: {{.cached_value}}</span>
|
||||
</div>
|
3
demo/src/app/views/cache/post.zmpl
vendored
Normal file
3
demo/src/app/views/cache/post.zmpl
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
<div>
|
||||
<span>Value "{{.message}}" added to cache</span>
|
||||
</div>
|
@ -6,29 +6,23 @@ pub fn index(request: *jetzig.Request, data: *jetzig.Data) !jetzig.View {
|
||||
|
||||
// Fetch a string from the KV store. If it exists, store it in the root data object,
|
||||
// otherwise store a string value to be picked up by the next request.
|
||||
if (request.kvGet(.string, "example-key")) |capture| {
|
||||
try root.put("stored_string", data.string(capture));
|
||||
if (try request.store.fetchRemove("example-key")) |capture| {
|
||||
try root.put("stored_string", capture);
|
||||
} else {
|
||||
try request.kvPut(.string, "example-key", "example-value");
|
||||
try root.put("stored_string", null);
|
||||
try request.store.put("example-key", data.string("example-value"));
|
||||
}
|
||||
|
||||
// Pop an item from the array and store it in the root data object. This will empty the
|
||||
// Left-pop an item from the array and store it in the root data object. This will empty the
|
||||
// array after multiple requests.
|
||||
if (request.kvPop("example-array")) |string| try root.put("popped", data.string(string));
|
||||
|
||||
// Fetch an array from the KV store. If it exists, store its values in the root data object,
|
||||
// otherwise store a new array to be picked up by the next request.
|
||||
if (request.kvGet(.array, "example-array")) |kv_array| {
|
||||
var array = try data.array();
|
||||
for (kv_array.items()) |item| try array.append(data.string(item));
|
||||
try root.put("stored_array", array);
|
||||
if (try request.store.popFirst("example-array")) |value| {
|
||||
try root.put("popped", value);
|
||||
} else {
|
||||
// Create a KV Array and store it in the key value store.
|
||||
var kv_array = request.kvArray();
|
||||
try kv_array.append("hello");
|
||||
try kv_array.append("goodbye");
|
||||
try kv_array.append("hello again");
|
||||
try request.kvPut(.array, "example-array", kv_array);
|
||||
try root.put("popped", null);
|
||||
// Store some values in an array in the KV store.
|
||||
try request.store.append("example-array", data.string("hello"));
|
||||
try request.store.append("example-array", data.string("goodbye"));
|
||||
try request.store.append("example-array", data.string("hello again"));
|
||||
}
|
||||
|
||||
return request.render(.ok);
|
||||
|
4
demo/src/app/views/kvstore/index.zmpl
Normal file
4
demo/src/app/views/kvstore/index.zmpl
Normal file
@ -0,0 +1,4 @@
|
||||
<div>Stored string: {{.stored_string}}</div>
|
||||
<div>Left-popped array value: {{.popped}}</div>
|
||||
|
||||
<div>Refresh this page to cycle through values</div>
|
@ -41,6 +41,45 @@ pub const jetzig_options = struct {
|
||||
// milliseconds.
|
||||
// pub const job_worker_sleep_interval_ms: usize = 10;
|
||||
|
||||
/// Key-value store options. Set backend to `.file` to use a file-based store.
|
||||
/// When using `.file` backend, you must also set `.file_options`.
|
||||
/// The key-value store is exposed as `request.store` in views and is also available in as
|
||||
/// `env.store` in all jobs/mailers.
|
||||
pub const store: jetzig.kv.Store.KVOptions = .{
|
||||
.backend = .memory,
|
||||
// .backend = .file,
|
||||
// .file_options = .{
|
||||
// .path = "/path/to/jetkv-store.db",
|
||||
// .truncate = false, // Set to `true` to clear the store on each server launch.
|
||||
// .address_space_size = jetzig.jetkv.JetKV.FileBackend.addressSpace(4096),
|
||||
// },
|
||||
};
|
||||
|
||||
/// Job queue options. Identical to `store` options, but allows using different
|
||||
/// backends (e.g. `.memory` for key-value store, `.file` for jobs queue.
|
||||
/// The job queue is managed internally by Jetzig.
|
||||
pub const job_queue: jetzig.kv.Store.KVOptions = .{
|
||||
.backend = .memory,
|
||||
// .backend = .file,
|
||||
// .file_options = .{
|
||||
// .path = "/path/to/jetkv-queue.db",
|
||||
// .truncate = false, // Set to `true` to clear the store on each server launch.
|
||||
// .address_space_size = jetzig.jetkv.JetKV.FileBackend.addressSpace(4096),
|
||||
// },
|
||||
};
|
||||
|
||||
/// Cache options. Identical to `store` options, but allows using different
|
||||
/// backends (e.g. `.memory` for key-value store, `.file` for cache.
|
||||
pub const cache: jetzig.kv.Store.KVOptions = .{
|
||||
.backend = .memory,
|
||||
// .backend = .file,
|
||||
// .file_options = .{
|
||||
// .path = "/path/to/jetkv-cache.db",
|
||||
// .truncate = false, // Set to `true` to clear the store on each server launch.
|
||||
// .address_space_size = jetzig.jetkv.JetKV.FileBackend.addressSpace(4096),
|
||||
// },
|
||||
};
|
||||
|
||||
/// SMTP configuration for Jetzig Mail. It is recommended to use a local SMTP relay,
|
||||
/// e.g.: https://github.com/juanluisbaptiste/docker-postfix
|
||||
// pub const smtp: jetzig.mail.SMTPConfig = .{
|
||||
|
@ -15,6 +15,7 @@ pub const types = @import("jetzig/types.zig");
|
||||
pub const markdown = @import("jetzig/markdown.zig");
|
||||
pub const jobs = @import("jetzig/jobs.zig");
|
||||
pub const mail = @import("jetzig/mail.zig");
|
||||
pub const kv = @import("jetzig/kv.zig");
|
||||
|
||||
/// The primary interface for a Jetzig application. Create an `App` in your application's
|
||||
/// `src/main.zig` and call `start` to launch the application.
|
||||
@ -99,6 +100,45 @@ pub const config = struct {
|
||||
/// milliseconds.
|
||||
pub const job_worker_sleep_interval_ms: usize = 10;
|
||||
|
||||
/// Key-value store options. Set backend to `.file` to use a file-based store.
|
||||
/// When using `.file` backend, you must also set `.file_options`.
|
||||
/// The key-value store is exposed as `request.store` in views and is also available in as
|
||||
/// `env.store` in all jobs/mailers.
|
||||
pub const store: kv.Store.KVOptions = .{
|
||||
.backend = .memory,
|
||||
// .backend = .file,
|
||||
// .file_options = .{
|
||||
// .path = "/path/to/jetkv-store.db",
|
||||
// .truncate = false, // Set to `true` to clear the store on each server launch.
|
||||
// .address_space_size = jetzig.jetkv.JetKV.FileBackend.addressSpace(4096),
|
||||
// },
|
||||
};
|
||||
|
||||
/// Job queue options. Identical to `store` options, but allows using different
|
||||
/// backends (e.g. `.memory` for key-value store, `.file` for jobs queue.
|
||||
/// The job queue is managed internally by Jetzig.
|
||||
pub const job_queue: kv.Store.KVOptions = .{
|
||||
.backend = .memory,
|
||||
// .backend = .file,
|
||||
// .file_options = .{
|
||||
// .path = "/path/to/jetkv-queue.db",
|
||||
// .truncate = false, // Set to `true` to clear the store on each server launch.
|
||||
// .address_space_size = jetzig.jetkv.JetKV.FileBackend.addressSpace(4096),
|
||||
// },
|
||||
};
|
||||
|
||||
/// Cache. Identical to `store` options, but allows using different
|
||||
/// backends (e.g. `.memory` for key-value store, `.file` for cache.
|
||||
pub const cache: kv.Store.KVOptions = .{
|
||||
.backend = .memory,
|
||||
// .backend = .file,
|
||||
// .file_options = .{
|
||||
// .path = "/path/to/jetkv-cache.db",
|
||||
// .truncate = false, // Set to `true` to clear the store on each server launch.
|
||||
// .address_space_size = jetzig.jetkv.JetKV.FileBackend.addressSpace(4096),
|
||||
// },
|
||||
};
|
||||
|
||||
/// SMTP configuration for Jetzig Mail.
|
||||
pub const smtp: mail.SMTPConfig = .{
|
||||
.port = 25,
|
||||
@ -133,7 +173,7 @@ pub fn init(allocator: std.mem.Allocator) !App {
|
||||
const environment = Environment.init(allocator);
|
||||
|
||||
return .{
|
||||
.server_options = try environment.getServerOptions(),
|
||||
.environment = environment,
|
||||
.allocator = allocator,
|
||||
};
|
||||
}
|
||||
|
@ -7,7 +7,7 @@ const mime_types = @import("mime_types").mime_types; // Generated at build time.
|
||||
|
||||
const App = @This();
|
||||
|
||||
server_options: jetzig.http.Server.ServerOptions,
|
||||
environment: jetzig.Environment,
|
||||
allocator: std.mem.Allocator,
|
||||
|
||||
pub fn deinit(self: App) void {
|
||||
@ -57,9 +57,29 @@ pub fn start(self: App, routes_module: type, options: AppOptions) !void {
|
||||
self.allocator.destroy(route);
|
||||
};
|
||||
|
||||
var jet_kv = jetzig.jetkv.JetKV.init(self.allocator, .{});
|
||||
var store = try jetzig.kv.Store.init(
|
||||
self.allocator,
|
||||
jetzig.config.get(jetzig.kv.Store.KVOptions, "store"),
|
||||
);
|
||||
defer store.deinit();
|
||||
|
||||
if (self.server_options.detach) {
|
||||
var job_queue = try jetzig.kv.Store.init(
|
||||
self.allocator,
|
||||
jetzig.config.get(jetzig.kv.Store.KVOptions, "job_queue"),
|
||||
);
|
||||
defer job_queue.deinit();
|
||||
|
||||
var cache = try jetzig.kv.Store.init(
|
||||
self.allocator,
|
||||
jetzig.config.get(jetzig.kv.Store.KVOptions, "cache"),
|
||||
);
|
||||
defer cache.deinit();
|
||||
|
||||
const server_options = try self.environment.getServerOptions();
|
||||
defer self.allocator.free(server_options.bind);
|
||||
defer self.allocator.free(server_options.secret);
|
||||
|
||||
if (server_options.detach) {
|
||||
const argv = try std.process.argsAlloc(self.allocator);
|
||||
defer std.process.argsFree(self.allocator, argv);
|
||||
var child_argv = std.ArrayList([]const u8).init(self.allocator);
|
||||
@ -76,24 +96,30 @@ pub fn start(self: App, routes_module: type, options: AppOptions) !void {
|
||||
|
||||
var server = jetzig.http.Server.init(
|
||||
self.allocator,
|
||||
self.server_options,
|
||||
server_options,
|
||||
routes.items,
|
||||
&routes_module.jobs,
|
||||
&routes_module.mailers,
|
||||
&mime_map,
|
||||
&jet_kv,
|
||||
&store,
|
||||
&job_queue,
|
||||
&cache,
|
||||
);
|
||||
defer server.deinit();
|
||||
|
||||
var mutex = std.Thread.Mutex{};
|
||||
var worker_pool = jetzig.jobs.Pool.init(
|
||||
self.allocator,
|
||||
&jet_kv,
|
||||
&job_queue,
|
||||
.{
|
||||
.logger = server.logger,
|
||||
.environment = server.options.environment,
|
||||
.routes = routes.items,
|
||||
.jobs = &routes_module.jobs,
|
||||
.mailers = &routes_module.mailers,
|
||||
.store = &store,
|
||||
.cache = &cache,
|
||||
.mutex = &mutex,
|
||||
},
|
||||
);
|
||||
defer worker_pool.deinit();
|
||||
@ -108,13 +134,13 @@ pub fn start(self: App, routes_module: type, options: AppOptions) !void {
|
||||
error.AddressInUse => {
|
||||
try server.logger.ERROR(
|
||||
"Socket unavailable: {s}:{} - unable to start server.\n",
|
||||
.{ self.server_options.bind, self.server_options.port },
|
||||
.{ server_options.bind, server_options.port },
|
||||
);
|
||||
return;
|
||||
},
|
||||
else => {
|
||||
try server.logger.ERROR("Encountered error: {}\nExiting.\n", .{err});
|
||||
return err;
|
||||
std.process.exit(1);
|
||||
},
|
||||
}
|
||||
};
|
||||
|
@ -109,10 +109,10 @@ fn mappingParam(input: []const u8) ?struct { key: []const u8, field: []const u8
|
||||
|
||||
fn dataValue(self: Query, value: ?[]const u8) *jetzig.data.Data.Value {
|
||||
if (value) |item_value| {
|
||||
const duped = self.data.getAllocator().dupe(u8, item_value) catch @panic("OOM");
|
||||
const duped = self.data.allocator().dupe(u8, item_value) catch @panic("OOM");
|
||||
return self.data.string(uriDecode(duped));
|
||||
} else {
|
||||
return jetzig.zmpl.Data._null(self.data.getAllocator());
|
||||
return jetzig.zmpl.Data._null(self.data.allocator());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -31,6 +31,60 @@ redirected: bool = false,
|
||||
rendered_multiple: bool = false,
|
||||
rendered_view: ?jetzig.views.View = null,
|
||||
start_time: i128,
|
||||
store: ArenaStore,
|
||||
cache: ArenaStore,
|
||||
|
||||
/// Wrapper for KV store that uses the request's arena allocator for fetching values.
|
||||
pub const ArenaStore = struct {
|
||||
allocator: std.mem.Allocator,
|
||||
store: *jetzig.kv.Store,
|
||||
|
||||
/// Put a String or into the key-value store.
|
||||
pub fn get(self: ArenaStore, key: []const u8) !?*jetzig.data.Value {
|
||||
return try self.store.get(try self.data(), key);
|
||||
}
|
||||
|
||||
/// Get a String from the store.
|
||||
pub fn put(self: ArenaStore, key: []const u8, value: *jetzig.data.Value) !void {
|
||||
try self.store.put(key, value);
|
||||
}
|
||||
|
||||
/// Remove a String to from the key-value store and return it if found.
|
||||
pub fn fetchRemove(self: ArenaStore, key: []const u8) !?*jetzig.data.Value {
|
||||
return try self.store.fetchRemove(try self.data(), key);
|
||||
}
|
||||
|
||||
/// Remove a String to from the key-value store.
|
||||
pub fn remove(self: ArenaStore, key: []const u8) !void {
|
||||
try self.store.remove(key);
|
||||
}
|
||||
|
||||
/// Append a Value to the end of an Array in the key-value store.
|
||||
pub fn append(self: ArenaStore, key: []const u8, value: *jetzig.data.Value) !void {
|
||||
try self.store.append(key, value);
|
||||
}
|
||||
|
||||
/// Prepend a Value to the start of an Array in the key-value store.
|
||||
pub fn prepend(self: ArenaStore, key: []const u8, value: *jetzig.data.Value) !void {
|
||||
try self.store.prepend(key, value);
|
||||
}
|
||||
|
||||
/// Pop a String from an Array in the key-value store.
|
||||
pub fn pop(self: ArenaStore, key: []const u8) !?*jetzig.data.Value {
|
||||
return try self.store.pop(try self.data(), key);
|
||||
}
|
||||
|
||||
/// Left-pop a String from an Array in the key-value store.
|
||||
pub fn popFirst(self: ArenaStore, key: []const u8) !?*jetzig.data.Value {
|
||||
return try self.store.popFirst(try self.data(), key);
|
||||
}
|
||||
|
||||
fn data(self: ArenaStore) !*jetzig.data.Data {
|
||||
const arena_data = try self.allocator.create(jetzig.data.Data);
|
||||
arena_data.* = jetzig.data.Data.init(self.allocator);
|
||||
return arena_data;
|
||||
}
|
||||
};
|
||||
|
||||
pub fn init(
|
||||
allocator: std.mem.Allocator,
|
||||
@ -65,11 +119,14 @@ pub fn init(
|
||||
.response_data = response_data,
|
||||
.std_http_request = std_http_request,
|
||||
.start_time = start_time,
|
||||
.store = .{ .store = server.store, .allocator = allocator },
|
||||
.cache = .{ .store = server.cache, .allocator = allocator },
|
||||
};
|
||||
}
|
||||
|
||||
pub fn deinit(self: *Request) void {
|
||||
// self.session.deinit();
|
||||
self.session.deinit();
|
||||
self.cookies.deinit();
|
||||
self.allocator.destroy(self.cookies);
|
||||
self.allocator.destroy(self.session);
|
||||
if (self.processed) self.allocator.free(self.body);
|
||||
@ -275,37 +332,6 @@ fn parseQuery(self: *Request) !*jetzig.data.Value {
|
||||
return self.query_body.?.data.value.?;
|
||||
}
|
||||
|
||||
/// Put a String or Array into the key-value store.
|
||||
/// `T` can be either `jetzig.KVString` or `jetzig.KVArray`
|
||||
pub fn kvPut(
|
||||
self: *Request,
|
||||
comptime value_type: jetzig.jetkv.value_types,
|
||||
key: jetzig.jetkv.types.String,
|
||||
value: jetzig.jetkv.ValueType(value_type),
|
||||
) !void {
|
||||
try self.server.jet_kv.put(value_type, key, value);
|
||||
}
|
||||
|
||||
/// Get a String or Array from the key-value store.
|
||||
/// `T` can be either `jetzig.KVString` or `jetzig.KVArray`
|
||||
pub fn kvGet(
|
||||
self: *Request,
|
||||
comptime value_type: jetzig.jetkv.value_types,
|
||||
key: jetzig.jetkv.types.String,
|
||||
) ?jetzig.jetkv.ValueType(value_type) {
|
||||
return self.server.jet_kv.get(value_type, key);
|
||||
}
|
||||
|
||||
/// Pop a String from an Array in the key-value store.
|
||||
pub fn kvPop(self: *Request, key: jetzig.jetkv.types.String) ?jetzig.jetkv.types.String {
|
||||
return self.server.jet_kv.pop(key);
|
||||
}
|
||||
|
||||
/// Return a new Array suitable for use in the KV store.
|
||||
pub fn kvArray(self: Request) jetzig.jetkv.types.Array {
|
||||
return jetzig.jetkv.types.Array.init(self.allocator);
|
||||
}
|
||||
|
||||
/// Creates a new Job. Receives a job name which must resolve to `src/app/jobs/<name>.zig`
|
||||
/// Call `Job.put(...)` to set job params.
|
||||
/// Call `Job.background()` to run the job outside of the request/response flow.
|
||||
@ -322,7 +348,9 @@ pub fn job(self: *Request, job_name: []const u8) !*jetzig.Job {
|
||||
const background_job = try self.allocator.create(jetzig.Job);
|
||||
background_job.* = jetzig.Job.init(
|
||||
self.allocator,
|
||||
self.server.jet_kv,
|
||||
self.server.store,
|
||||
self.server.job_queue,
|
||||
self.server.cache,
|
||||
self.server.logger,
|
||||
self.server.job_definitions,
|
||||
job_name,
|
||||
@ -375,6 +403,9 @@ const RequestMail = struct {
|
||||
.routes = self.request.server.routes,
|
||||
.mailers = self.request.server.mailer_definitions,
|
||||
.jobs = self.request.server.job_definitions,
|
||||
.store = self.request.server.store,
|
||||
.cache = self.request.server.cache,
|
||||
.mutex = undefined,
|
||||
},
|
||||
),
|
||||
}
|
||||
|
@ -22,7 +22,9 @@ mailer_definitions: []const jetzig.MailerDefinition,
|
||||
mime_map: *jetzig.http.mime.MimeMap,
|
||||
std_net_server: std.net.Server = undefined,
|
||||
initialized: bool = false,
|
||||
jet_kv: *jetzig.jetkv.JetKV,
|
||||
store: *jetzig.kv.Store,
|
||||
job_queue: *jetzig.kv.Store,
|
||||
cache: *jetzig.kv.Store,
|
||||
|
||||
const Server = @This();
|
||||
|
||||
@ -33,7 +35,9 @@ pub fn init(
|
||||
job_definitions: []const jetzig.JobDefinition,
|
||||
mailer_definitions: []const jetzig.MailerDefinition,
|
||||
mime_map: *jetzig.http.mime.MimeMap,
|
||||
jet_kv: *jetzig.jetkv.JetKV,
|
||||
store: *jetzig.kv.Store,
|
||||
job_queue: *jetzig.kv.Store,
|
||||
cache: *jetzig.kv.Store,
|
||||
) Server {
|
||||
return .{
|
||||
.allocator = allocator,
|
||||
@ -43,7 +47,9 @@ pub fn init(
|
||||
.job_definitions = job_definitions,
|
||||
.mailer_definitions = mailer_definitions,
|
||||
.mime_map = mime_map,
|
||||
.jet_kv = jet_kv,
|
||||
.store = store,
|
||||
.job_queue = job_queue,
|
||||
.cache = cache,
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -13,10 +13,15 @@ pub const JobEnv = struct {
|
||||
routes: []*const jetzig.Route,
|
||||
mailers: []const jetzig.MailerDefinition,
|
||||
jobs: []const jetzig.JobDefinition,
|
||||
store: *jetzig.kv.Store,
|
||||
cache: *jetzig.kv.Store,
|
||||
mutex: *std.Thread.Mutex,
|
||||
};
|
||||
|
||||
allocator: std.mem.Allocator,
|
||||
jet_kv: *jetzig.jetkv.JetKV,
|
||||
store: *jetzig.kv.Store,
|
||||
job_queue: *jetzig.kv.Store,
|
||||
cache: *jetzig.kv.Store,
|
||||
logger: jetzig.loggers.Logger,
|
||||
name: []const u8,
|
||||
definition: ?JobDefinition,
|
||||
@ -28,7 +33,9 @@ const Job = @This();
|
||||
/// Initialize a new Job
|
||||
pub fn init(
|
||||
allocator: std.mem.Allocator,
|
||||
jet_kv: *jetzig.jetkv.JetKV,
|
||||
store: *jetzig.kv.Store,
|
||||
job_queue: *jetzig.kv.Store,
|
||||
cache: *jetzig.kv.Store,
|
||||
logger: jetzig.loggers.Logger,
|
||||
jobs: []const JobDefinition,
|
||||
name: []const u8,
|
||||
@ -47,7 +54,9 @@ pub fn init(
|
||||
|
||||
return .{
|
||||
.allocator = allocator,
|
||||
.jet_kv = jet_kv,
|
||||
.store = store,
|
||||
.job_queue = job_queue,
|
||||
.cache = cache,
|
||||
.logger = logger,
|
||||
.name = name,
|
||||
.definition = definition,
|
||||
@ -65,7 +74,6 @@ pub fn deinit(self: *Job) void {
|
||||
/// Add a Job to the queue
|
||||
pub fn schedule(self: *Job) !void {
|
||||
try self.params.put("__jetzig_job_name", self.data.string(self.name));
|
||||
const json = try self.data.toJson();
|
||||
try self.jet_kv.prepend("__jetzig_jobs", json);
|
||||
try self.job_queue.append("__jetzig_jobs", self.data.value.?);
|
||||
try self.logger.INFO("Scheduled job: {s}", .{self.name});
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ const jetzig = @import("../../jetzig.zig");
|
||||
const Pool = @This();
|
||||
|
||||
allocator: std.mem.Allocator,
|
||||
jet_kv: *jetzig.jetkv.JetKV,
|
||||
job_queue: *jetzig.kv.Store,
|
||||
job_env: jetzig.jobs.JobEnv,
|
||||
pool: std.Thread.Pool = undefined,
|
||||
workers: std.ArrayList(*jetzig.jobs.Worker),
|
||||
@ -13,12 +13,12 @@ workers: std.ArrayList(*jetzig.jobs.Worker),
|
||||
/// Initialize a new worker thread pool.
|
||||
pub fn init(
|
||||
allocator: std.mem.Allocator,
|
||||
jet_kv: *jetzig.jetkv.JetKV,
|
||||
job_queue: *jetzig.kv.Store,
|
||||
job_env: jetzig.jobs.JobEnv,
|
||||
) Pool {
|
||||
return .{
|
||||
.allocator = allocator,
|
||||
.jet_kv = jet_kv,
|
||||
.job_queue = job_queue,
|
||||
.job_env = job_env,
|
||||
.workers = std.ArrayList(*jetzig.jobs.Worker).init(allocator),
|
||||
};
|
||||
@ -42,7 +42,7 @@ pub fn work(self: *Pool, threads: usize, interval: usize) !void {
|
||||
self.allocator,
|
||||
self.job_env,
|
||||
index,
|
||||
self.jet_kv,
|
||||
self.job_queue,
|
||||
interval,
|
||||
);
|
||||
try self.workers.append(worker);
|
||||
|
@ -6,21 +6,21 @@ const Worker = @This();
|
||||
allocator: std.mem.Allocator,
|
||||
job_env: jetzig.jobs.JobEnv,
|
||||
id: usize,
|
||||
jet_kv: *jetzig.jetkv.JetKV,
|
||||
job_queue: *jetzig.kv.Store,
|
||||
interval: usize,
|
||||
|
||||
pub fn init(
|
||||
allocator: std.mem.Allocator,
|
||||
job_env: jetzig.jobs.JobEnv,
|
||||
id: usize,
|
||||
jet_kv: *jetzig.jetkv.JetKV,
|
||||
job_queue: *jetzig.kv.Store,
|
||||
interval: usize,
|
||||
) Worker {
|
||||
return .{
|
||||
.allocator = allocator,
|
||||
.job_env = job_env,
|
||||
.id = id,
|
||||
.jet_kv = jet_kv,
|
||||
.job_queue = job_queue,
|
||||
.interval = interval * 1000 * 1000, // millisecond => nanosecond
|
||||
};
|
||||
}
|
||||
@ -30,10 +30,16 @@ pub fn work(self: *const Worker) void {
|
||||
self.log(.INFO, "[worker-{}] Job worker started.", .{self.id});
|
||||
|
||||
while (true) {
|
||||
if (self.jet_kv.pop("__jetzig_jobs")) |json| {
|
||||
defer self.allocator.free(json);
|
||||
if (self.matchJob(json)) |job_definition| {
|
||||
self.processJob(job_definition, json);
|
||||
var data = jetzig.data.Data.init(self.allocator);
|
||||
defer data.deinit();
|
||||
const maybe_value = self.job_queue.popFirst(&data, "__jetzig_jobs") catch |err| blk: {
|
||||
self.log(.ERROR, "Error fetching job from queue: {s}", .{@errorName(err)});
|
||||
break :blk null; // FIXME: Probably close thread here ?
|
||||
};
|
||||
|
||||
if (maybe_value) |value| {
|
||||
if (self.matchJob(value)) |job_definition| {
|
||||
self.processJob(job_definition, value);
|
||||
}
|
||||
} else {
|
||||
std.time.sleep(self.interval);
|
||||
@ -44,27 +50,19 @@ pub fn work(self: *const Worker) void {
|
||||
}
|
||||
|
||||
// Do a minimal parse of JSON job data to identify job name, then match on known job definitions.
|
||||
fn matchJob(self: Worker, json: []const u8) ?jetzig.jobs.JobDefinition {
|
||||
const parsed_json = std.json.parseFromSlice(
|
||||
struct { __jetzig_job_name: []const u8 },
|
||||
self.allocator,
|
||||
json,
|
||||
.{ .ignore_unknown_fields = true },
|
||||
) catch |err| {
|
||||
fn matchJob(self: Worker, value: *const jetzig.data.Value) ?jetzig.jobs.JobDefinition {
|
||||
const job_name = value.getT(.string, "__jetzig_job_name") orelse {
|
||||
self.log(
|
||||
.ERROR,
|
||||
"[worker-{}] Error parsing JSON from job queue: {s}",
|
||||
.{ self.id, @errorName(err) },
|
||||
"[worker-{}] Missing expected job name field `__jetzig_job_name`",
|
||||
.{self.id},
|
||||
);
|
||||
return null;
|
||||
};
|
||||
|
||||
const job_name = parsed_json.value.__jetzig_job_name;
|
||||
|
||||
// TODO: Hashmap
|
||||
for (self.job_env.jobs) |job_definition| {
|
||||
if (std.mem.eql(u8, job_definition.name, job_name)) {
|
||||
parsed_json.deinit();
|
||||
return job_definition;
|
||||
}
|
||||
} else {
|
||||
@ -75,34 +73,19 @@ fn matchJob(self: Worker, json: []const u8) ?jetzig.jobs.JobDefinition {
|
||||
|
||||
// Fully parse JSON job data and invoke the defined job's run function, passing the parsed params
|
||||
// as a `*jetzig.data.Value`.
|
||||
fn processJob(self: Worker, job_definition: jetzig.JobDefinition, json: []const u8) void {
|
||||
var data = jetzig.data.Data.init(self.allocator);
|
||||
defer data.deinit();
|
||||
|
||||
data.fromJson(json) catch |err| {
|
||||
self.log(
|
||||
.INFO,
|
||||
"[worker-{}] Error parsing JSON for job `{s}`: {s}",
|
||||
.{ self.id, job_definition.name, @errorName(err) },
|
||||
);
|
||||
};
|
||||
|
||||
fn processJob(self: Worker, job_definition: jetzig.JobDefinition, params: *jetzig.data.Value) void {
|
||||
var arena = std.heap.ArenaAllocator.init(self.allocator);
|
||||
defer arena.deinit();
|
||||
|
||||
if (data.value) |params| {
|
||||
job_definition.runFn(arena.allocator(), params, self.job_env) catch |err| {
|
||||
self.log(
|
||||
.ERROR,
|
||||
"[worker-{}] Encountered error processing job `{s}`: {s}",
|
||||
.{ self.id, job_definition.name, @errorName(err) },
|
||||
);
|
||||
return;
|
||||
};
|
||||
self.log(.INFO, "[worker-{}] Job completed: {s}", .{ self.id, job_definition.name });
|
||||
} else {
|
||||
self.log(.ERROR, "Error in job params definition for job: {s}", .{job_definition.name});
|
||||
}
|
||||
job_definition.runFn(arena.allocator(), params, self.job_env) catch |err| {
|
||||
self.log(
|
||||
.ERROR,
|
||||
"[worker-{}] Encountered error processing job `{s}`: {s}",
|
||||
.{ self.id, job_definition.name, @errorName(err) },
|
||||
);
|
||||
return;
|
||||
};
|
||||
self.log(.INFO, "[worker-{}] Job completed: {s}", .{ self.id, job_definition.name });
|
||||
}
|
||||
|
||||
// Log with error handling and fallback. Prefix with worker ID.
|
||||
|
3
src/jetzig/kv.zig
Normal file
3
src/jetzig/kv.zig
Normal file
@ -0,0 +1,3 @@
|
||||
const std = @import("std");
|
||||
|
||||
pub const Store = @import("kv/Store.zig");
|
94
src/jetzig/kv/Store.zig
Normal file
94
src/jetzig/kv/Store.zig
Normal file
@ -0,0 +1,94 @@
|
||||
const std = @import("std");
|
||||
const jetzig = @import("../../jetzig.zig");
|
||||
|
||||
const Store = @This();
|
||||
|
||||
store: jetzig.jetkv.JetKV,
|
||||
options: KVOptions,
|
||||
|
||||
pub const KVOptions = struct {
|
||||
backend: enum { memory, file } = .memory,
|
||||
file_options: struct {
|
||||
path: ?[]const u8 = null,
|
||||
address_space_size: u32 = jetzig.jetkv.JetKV.FileBackend.addressSpace(4096),
|
||||
truncate: bool = false,
|
||||
} = .{},
|
||||
};
|
||||
|
||||
const ValueType = enum { string, array };
|
||||
|
||||
/// Initialize a new memory or file store.
|
||||
pub fn init(allocator: std.mem.Allocator, options: KVOptions) !Store {
|
||||
const store = try jetzig.jetkv.JetKV.init(
|
||||
allocator,
|
||||
switch (options.backend) {
|
||||
.file => .{
|
||||
.backend = .file,
|
||||
.file_backend_options = .{
|
||||
.path = options.file_options.path,
|
||||
.address_space_size = options.file_options.address_space_size,
|
||||
.truncate = options.file_options.truncate,
|
||||
},
|
||||
},
|
||||
.memory => .{
|
||||
.backend = .memory,
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
return .{ .store = store, .options = options };
|
||||
}
|
||||
|
||||
/// Free allocated resources/close database file.
|
||||
pub fn deinit(self: *Store) void {
|
||||
self.store.deinit();
|
||||
}
|
||||
|
||||
/// Put a Value or into the key-value store.
|
||||
pub fn put(self: *Store, key: []const u8, value: *jetzig.data.Value) !void {
|
||||
try self.store.put(key, try value.toJson());
|
||||
}
|
||||
|
||||
/// Get a Value from the store.
|
||||
pub fn get(self: *Store, data: *jetzig.data.Data, key: []const u8) !?*jetzig.data.Value {
|
||||
return try parseValue(data, try self.store.get(data.allocator(), key));
|
||||
}
|
||||
|
||||
/// Remove a Value to from the key-value store and return it if found.
|
||||
pub fn fetchRemove(self: *Store, data: *jetzig.data.Data, key: []const u8) !?*jetzig.data.Value {
|
||||
return try parseValue(data, try self.store.fetchRemove(data.allocator(), key));
|
||||
}
|
||||
|
||||
/// Remove a Value to from the key-value store.
|
||||
pub fn remove(self: *Store, key: []const u8) !void {
|
||||
try self.store.remove(key);
|
||||
}
|
||||
|
||||
/// Append a Value to the end of an Array in the key-value store.
|
||||
pub fn append(self: *Store, key: []const u8, value: *const jetzig.data.Value) !void {
|
||||
try self.store.append(key, try value.toJson());
|
||||
}
|
||||
|
||||
/// Prepend a Value to the start of an Array in the key-value store.
|
||||
pub fn prepend(self: *Store, key: []const u8, value: *const jetzig.data.Value) !void {
|
||||
try self.store.prepend(key, try value.toJson());
|
||||
}
|
||||
|
||||
/// Pop a Value from an Array in the key-value store.
|
||||
pub fn pop(self: *Store, data: *jetzig.data.Data, key: []const u8) !?*jetzig.data.Value {
|
||||
return try parseValue(data, try self.store.pop(data.allocator(), key));
|
||||
}
|
||||
|
||||
/// Left-pop a Value from an Array in the key-value store.
|
||||
pub fn popFirst(self: *Store, data: *jetzig.data.Data, key: []const u8) !?*jetzig.data.Value {
|
||||
return try parseValue(data, try self.store.popFirst(data.allocator(), key));
|
||||
}
|
||||
|
||||
fn parseValue(data: *jetzig.data.Data, maybe_json: ?[]const u8) !?*jetzig.data.Value {
|
||||
if (maybe_json) |json| {
|
||||
try data.fromJson(json);
|
||||
return data.value.?;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
@ -65,7 +65,7 @@ pub fn initParams(self: *Route, allocator: std.mem.Allocator) !void {
|
||||
pub fn deinitParams(self: *const Route) void {
|
||||
for (self.params.items) |data| {
|
||||
data.deinit();
|
||||
data._allocator.destroy(data);
|
||||
data.parent_allocator.destroy(data);
|
||||
}
|
||||
self.params.deinit();
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user