| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758 |
- import 'dart:io';
- import 'package:file_upload_processor/handlers/base_api.dart';
- import 'package:mime/mime.dart';
- import 'package:shelf/shelf.dart' as shelf;
- import 'package:path/path.dart' as path;
- import 'package:http_parser/http_parser.dart';
- import 'package:archive/archive.dart';
- import 'package:supabase/supabase.dart';
- import 'package:intl/intl.dart';
- class FileUploadApi extends BaseApi {
- FileUploadApi(shelf.Request request) : super(request);
- final uploadFolder = "./uploaded";
- String get rawFolder => "$uploadFolder/raw";
- String get dataFolder => "$uploadFolder/data";
- String workingFolder = "";
- String get zipFolder => "$rawFolder/$workingFolder";
- String get extFolder => "$dataFolder/$workingFolder";
- SupabaseClient getSupabaseClient(shelf.Request request) {
- final supabaseUrl = request.headers['supabase-url'];
- final authHeader = request.headers['authorization'];
- final token = authHeader!.substring(7); // Remove 'Bearer ' prefix
- return SupabaseClient(
- supabaseUrl!,
- token,
- );
- }
- String getTimestampedFilename(String originalFilename) {
- final timestamp = DateFormat('yyyyMMdd_HHmmss').format(DateTime.now());
- return '${timestamp}_$originalFilename';
- }
- Future<void> uploadToSupabase(
- String filePath, String filename, SupabaseClient supabaseClient,
- {bool timestamped = false,
- required String bucket,
- bool upsert = true}) async {
- try {
- final file = File(filePath);
- final timestampedFilename =
- timestamped ? getTimestampedFilename(filename) : filename;
- await supabaseClient.storage.from(bucket).upload(
- timestampedFilename,
- file,
- fileOptions: FileOptions(
- cacheControl: '3600',
- upsert: upsert,
- ),
- );
- print('+File uploaded to <$bucket>: $timestampedFilename');
- } catch (e) {
- print('!Error uploading to Supabase: $e');
- rethrow;
- }
- }
- Future<void> initializeDirectories() async {
- final directories = [
- Directory(rawFolder),
- Directory(dataFolder),
- Directory(zipFolder),
- Directory(extFolder),
- ];
- for (var dir in directories) {
- if (!await dir.exists()) {
- await dir.create(recursive: true);
- }
- }
- }
- bool isZipFile(List<int> bytes) {
- if (bytes.length < 4) return false;
- return bytes[0] == 0x50 &&
- bytes[1] == 0x4B &&
- bytes[2] == 0x03 &&
- bytes[3] == 0x04;
- }
- Future<List<String>> processZipFile(String filePath) async {
- List<String> files = [];
- final bytes = await File(filePath).readAsBytes();
- final archive = ZipDecoder().decodeBytes(bytes);
- for (final file in archive) {
- final filename = file.name;
- if (file.isFile) {
- final data = file.content as List<int>;
- final outFile = File(path.join(extFolder, filename));
- await outFile.parent.create(recursive: true);
- await outFile.writeAsBytes(data);
- files.add(path.join(extFolder, filename));
- }
- }
- return files;
- }
- @override
- response() async {
- final supabaseUrl = request.headers['supabase-url'];
- final authHeader = request.headers['authorization'];
- if (authHeader == null || !authHeader.startsWith('Bearer ')) {
- return shelf.Response.badRequest(
- body: 'Invalid or missing Authorization Bearer token');
- }
- if (supabaseUrl == null) {
- return shelf.Response.badRequest(
- body: 'Supabase URL not provided in headers');
- }
- workingFolder = DateTime.now().millisecondsSinceEpoch.toString();
- final supabaseClient = getSupabaseClient(request);
- await initializeDirectories();
- final contentType = request.headers['content-type'];
- if (contentType == null ||
- !contentType.toLowerCase().startsWith('multipart/form-data')) {
- return shelf.Response.badRequest(
- body: 'Content-Type must be multipart/form-data');
- }
- try {
- final mediaType = MediaType.parse(contentType);
- final boundary = mediaType.parameters['boundary'];
- if (boundary == null) {
- return shelf.Response.badRequest(body: 'Boundary not found');
- }
- final transformer = MimeMultipartTransformer(boundary);
- final bodyBytes = await request.read().expand((e) => e).toList();
- final stream = Stream.fromIterable([bodyBytes]);
- final parts = await transformer.bind(stream).toList();
- for (var part in parts) {
- final contentDisposition = part.headers['content-disposition'];
- if (contentDisposition == null) continue;
- final filenameMatch =
- RegExp(r'filename="([^"]*)"').firstMatch(contentDisposition);
- if (filenameMatch == null) continue;
- final filename = filenameMatch.group(1);
- if (filename == null) continue;
- final bytes = await part.fold<List<int>>(
- [],
- (prev, element) => [...prev, ...element],
- );
- final rawFilePath = path.join(zipFolder, filename);
- await File(rawFilePath).writeAsBytes(bytes);
- List<String> files = [];
- if (isZipFile(bytes)) {
- files.addAll(await processZipFile(rawFilePath));
- } else {
- final dataFilePath = path.join(extFolder, filename);
- await File(rawFilePath).copy(dataFilePath);
- files.add(dataFilePath);
- }
- bytes.clear();
- //upload to supabase storage
- await uploadToSupabase(rawFilePath, filename, supabaseClient,
- bucket: 'csvhich', timestamped: false, upsert: true);
- //upload to supabase storage archive timestamped
- await uploadToSupabase(rawFilePath, filename, supabaseClient,
- bucket: 'csvhich_archive', timestamped: true, upsert: false);
- //insert data to supabase csvhichupdates
- await supabaseClient
- .from('csvhichupdates')
- .insert({'filename': filename});
- for (var file in files) {
- final fileProcess = FileProcess(file, supabaseClient);
- await fileProcess.go(donttouchdb: false);
- }
- }
- return shelf.Response.ok('File processed and uploaded successfully');
- } catch (e) {
- //print('Error: $e\n$stackTrace');
- return shelf.Response.internalServerError(
- body: 'Error processing upload: $e');
- } finally {
- supabaseClient.dispose();
- await File(zipFolder).delete(recursive: true);
- await File(extFolder).delete(recursive: true);
- }
- }
- }
- class FileProcess {
- FileProcess(this.filepath, this.supabase);
- final String filepath;
- final SupabaseClient supabase;
- String get filename => filepath.replaceAll('\\', "/").split("/").last;
- final Map<String, String> tables = {
- "secondprgtype.txt": "aclegs_csv",
- "ExportPGRGPNmois.txt": "pnlegs_csv",
- "exportPGRGPN.txt": "pnlegs_csv",
- "exportlicence.txt": "licences_csv",
- };
- final Map<String, List<String>> _headers = {
- "secondprgtype.txt": [
- "leg_no",
- "fn_carrier",
- "fn_number",
- "fn_suffix",
- "day_of_origin",
- "ac_owner",
- "ac_subtype",
- "ac_version",
- "ac_registration",
- "dep_ap_actual",
- "dep_ap_sched",
- "dep_dt_est",
- "dep_sched_dt",
- "arr_ap_actual",
- "arr_ap_sched",
- "arr_dt_est",
- "arr_sched_dt",
- "slot_time_actual",
- "leg_type",
- "status",
- "employer_cockpit",
- "employer_cabin",
- "cycles",
- "delay_code_01",
- "delay_code_02",
- "delay_code_03",
- "delay_code_04",
- "delay_time_01",
- "delay_time_02",
- "delay_time_03",
- "delay_time_04",
- "subdelay_code_01",
- "subdelay_code_02",
- "subdelay_code_03",
- "subdelay_code_04",
- "pax_booked_c",
- "pax_booked_y",
- "pax_booked_trs_c",
- "pax_booked_trs_y",
- "pad_booked_c",
- "pad_booked_y",
- "offblock_dt_a",
- "airborne_dt_a",
- "landing_dt_a",
- "onblock_dt_a",
- "offblock_dt_f",
- "airborne_dt_f",
- "landing_dt_f",
- "onblock_dt_f",
- "offblock_dt_m",
- "airborne_dt_m",
- "landing_dt_m",
- "onblock_dt_m",
- "eet",
- ],
- "exportPGRGPN.txt": [
- "date",
- "tlc",
- "actype",
- "al",
- "fnum",
- "ddep",
- "hdep",
- "ddes",
- "hdes",
- "dep",
- "des",
- "label",
- "type",
- ],
- "ExportPGRGPNmois.txt": [
- "date",
- "tlc",
- "actype",
- "al",
- "fnum",
- "ddep",
- "hdep",
- "ddes",
- "hdes",
- "dep",
- "des",
- "label",
- "type",
- ],
- "exportlicence.txt": [
- "tlc",
- "lname",
- "mname",
- "fname",
- "expire",
- "ac",
- "college",
- "base",
- ],
- };
- final Map<String, String> scopes = {
- "secondprgtype.txt": "day_of_origin",
- "exportPGRGPN.txt": "date",
- "ExportPGRGPNmois.txt": "date",
- "exportlicence.txt": "tlc",
- };
- final Map<String, String> logTables = {
- "secondprgtype.txt": "aclegs_csv_log",
- "ExportPGRGPNmois.txt": "pnlegs_csv_log",
- "exportPGRGPN.txt": "pnlegs_csv_log",
- "exportlicence.txt": "licences_csv_log",
- };
- //all tables trackers: key,add,remove
- final Map<String, List<Map<String, dynamic>>> trackers = {
- "secondprgtype.txt": [
- {
- "table": "aclegs_log_reg",
- "groupby": [
- "day_of_origin",
- "dep_sched_dt",
- "fn_carrier",
- "fn_number",
- "dep_ap_sched",
- "arr_ap_sched",
- // "dep_ap_actual",
- // "arr_ap_actual"
- ],
- "track": [
- "ac_registration",
- ]
- },
- {
- "table": "aclegs_log_leg",
- "groupby": [
- "day_of_origin",
- "dep_sched_dt",
- "fn_carrier",
- "fn_number",
- "dep_ap_sched",
- "arr_ap_sched",
- ],
- "track": [
- "dep_ap_actual",
- "arr_ap_actual",
- ]
- }
- ],
- "exportPGRGPN.txt": [
- {
- "table": "pnlegs_log_roster",
- "groupby": ["date", "tlc"],
- "track": ["dep", "des", "al", "fnum", "label"]
- },
- {
- "table": "pnlegs_log_duty",
- "groupby": ["date", "dep", "des", "al", "fnum", "label"],
- "track": ["tlc"]
- },
- {
- "table": "pnlegs_log_sched",
- "groupby": ["date", "dep", "des", "al", "fnum", "label"],
- "track": ["hdep", "hdes"]
- },
- ],
- "ExportPGRGPNmois.txt": [
- {
- "table": "pnlegs_log_roster",
- "groupby": ["date", "tlc"],
- "track": ["dep", "des", "al", "fnum", "label"]
- },
- {
- "table": "pnlegs_log_duty",
- "groupby": ["date", "dep", "des", "al", "fnum", "label"],
- "track": ["tlc"]
- },
- {
- "table": "pnlegs_log_sched",
- "groupby": ["date", "dep", "des", "al", "fnum", "label"],
- "track": ["hdep", "hdes"]
- },
- ],
- "exportlicence.txt": [
- {
- "table": "licences_log_qualif",
- "groupby": [
- "tlc",
- "fname",
- "mname",
- "lname",
- ],
- "track": [
- "ac",
- "college",
- "base",
- ]
- }
- ],
- };
- Future<List<Map<String, dynamic>>> parseCsv() async {
- final headers = _headers[filename] ?? [];
- if (headers.isEmpty) {
- throw Exception('No headers found for file: $filename');
- }
- // Initialize an empty list to hold the parsed data
- List<Map<String, dynamic>> data = [];
- // Read the CSV file
- final file = File(filepath);
- final lines = await file.readAsLines();
- // Iterate over each line in the CSV file
- for (int i = 0; i < lines.length; i++) {
- // Split the line into individual values
- final values = lines[i].split(',');
- if (values.length != headers.length) {
- //print('Skipping line $i: Incorrect number of values: line: $i');
- continue;
- }
- // Create a map for the current row
- Map<String, dynamic> row = {};
- // Assign each value to the corresponding header
- for (int j = 0; j < headers.length; j++) {
- row[headers[j]] = values[j].trim().removeQuotes.trim().nullIfEmpty;
- }
- // Add the row map to the data list
- data.add(row);
- }
- // Return the parsed data
- return data;
- }
- List<String> get filesTomonitor => _headers.keys.toList();
- Future<void> go({bool donttouchdb = false}) async {
- if (!filesTomonitor.contains(filename)) return;
- final allmapsToInsert = await parseCsv();
- final scopeName = scopes[filename] ?? "";
- final scopesInNew = allmapsToInsert
- .fold(<String>{}, (t, e) => t..add(e[scopeName] ?? "")).toList()
- ..sort((a, b) => _parseDate(a).compareTo(_parseDate(b)));
- //special export prgpn list of tlcs imported in new file
- Set<String> tlcs = {};
- for (var scopeInNew in scopesInNew) {
- final mapsToInsert =
- allmapsToInsert.where((e) => e[scopeName] == scopeInNew).toList();
- List<Map<String, dynamic>> oldIds = [];
- List<Map<String, dynamic>> oldComparable = [];
- //load old data
- final res = await supabase
- .from(tables[filename]!)
- .select()
- .eq(scopeName, scopeInNew)
- .limit(300000);
- oldIds.addAll(res.map((e) => {"id": e["id"]}));
- oldComparable.addAll(res.map((e) => e..remove("id")));
- final comparisonResult = compareLists(oldComparable, mapsToInsert);
- List<int> indexToRemove = comparisonResult.removeIndices;
- List<int> indexToMaintain = comparisonResult.maintainIndices;
- final dataToInsert = comparisonResult.insertData;
- //special export prgpn
- List<int> indexRemovedToMaintain = [];
- if (filename == "exportPGRGPN.txt" ||
- filename == "ExportPGRGPNmois.txt") {
- //if there is line insert with tlc and date
- for (final data in mapsToInsert) {
- tlcs..add(data["tlc"] ?? "");
- }
- indexRemovedToMaintain = indexToRemove
- .where((e) => !tlcs.contains(oldComparable[e]["tlc"]))
- .toList();
- // indexToMaintain.addAll(indexRemovedToMaintain);
- // indexToRemove.removeWhere((e) => indexRemovedToMaintain.contains(e));
- for (var e in indexRemovedToMaintain) {
- indexToMaintain.add(e);
- indexToRemove.remove(e);
- }
- }
- try {
- if (!donttouchdb)
- for (var e in chunkList(
- indexToRemove.map((f) => oldIds[f]['id']).toList(), 100)) {
- await supabase
- .from(tables[filename]!) // Replace with your actual table name
- .delete()
- .inFilter('id', e);
- }
- // insering new data
- if (!donttouchdb)
- await supabase
- .from(tables[filename]!) // Replace with your actual table name
- .insert(dataToInsert);
- } catch (e, stackTrace) {
- print('Error: $e\n$stackTrace');
- }
- print(
- " Scope:$scopeInNew insert:${dataToInsert.length} remove:${indexToRemove.length} maintain:${indexToMaintain.length}");
- //logging changes into tables
- final logTable = logTables[filename]!;
- final logData = dataToInsert
- .map((e) => {"scope": scopeInNew, "data": e, "action": "insert"})
- .toList();
- for (var e in chunkList(
- indexToRemove.map((f) => oldComparable[f]).toList(), 100)) {
- // e.forEach((k) => print("log: -: $k"));
- if (!donttouchdb)
- await supabase
- .from(logTable) // Replace with your actual table name
- .insert(e
- .map((e) =>
- {"scope": scopeInNew, "data": e, "action": "delete"})
- .toList());
- }
- for (var e in chunkList(logData, 100)) {
- // e.forEach((k) => print("log: +: $k"));
- if (!donttouchdb) await supabase.from(logTable).insert(e);
- }
- //logging tracking data
- final List<Map<String, dynamic>> mapsToInsertNoDelete =
- List.from(mapsToInsert)
- ..addAll(indexRemovedToMaintain.map((e) => oldComparable[e]));
- for (var tracker in trackers[filename] ?? []) {
- final String table = tracker["table"];
- final List<String> groupby = tracker["groupby"] ?? [];
- final List<String> track = tracker["track"] ?? [];
- final stateOld = oldComparable.groupBy(
- (e) => groupby.map((f) => e[f]).join("|"),
- dataFunction: (e) =>
- e.filterKeys(track).values.map((j) => j ?? "").join("|"));
- final stateNew = mapsToInsertNoDelete.groupBy(
- (e) => groupby.map((f) => e[f]).join("|"),
- dataFunction: (e) =>
- e.filterKeys(track).values.map((j) => j ?? "").join("|"));
- List logs = [];
- for (var key
- in (stateOld.keys.toList()..addAll(stateNew.keys)).toSet()) {
- final (add, remove) =
- (stateNew[key] ?? []).compareWith(stateOld[key] ?? []);
- //if (!key.endsWith(tracktlc)) continue;
- //foreach add remove
- if (add.isNotEmpty || remove.isNotEmpty) {
- final row = {
- "key": list2map(groupby, key.split("|")),
- "add": add.isNotEmpty
- ? add.map((e) => list2map(track, e.split("|"))).toList()
- : [],
- "remove": remove.isNotEmpty
- ? remove.map((e) => list2map(track, e.split("|"))).toList()
- : [],
- };
- logs.add(row);
- // if (key.contains("7506")) {
- // print(
- // " table:$table key: $key \n\n ---old:${stateOld[key]} \n\n +++new:${stateNew[key]}");
- // }
- }
- }
- //print(" Tracker:$table");
- for (var e in chunkList(logs, 100)) {
- // e.forEach((k) => print("log: +: $k"));
- if (!donttouchdb) await supabase.from(table).insert(e);
- }
- }
- }
- }
- Map<String, dynamic> list2map(List<String> keys, List<dynamic> data) {
- Map<String, dynamic> map = {};
- for (var i = 0; i < keys.length; i++) {
- final key = keys[i];
- final datum = data[i];
- map[key] = datum;
- }
- return map;
- }
- // Compare two lists of maps and return the indices to maintain, remove, and insert
- ({
- List<int> maintainIndices,
- List<int> removeIndices,
- // List<int> insertIndices
- List<Map> insertData
- }) compareLists(
- List<Map<String, dynamic>> map1, List<Map<String, dynamic>> map2) {
- List<int> maintainIndices = [];
- List<int> removeIndices = [];
- List<Map<String, dynamic>> insertData = List.from(map2);
- // Find indices to maintain and remove in map1
- for (int i = 0; i < map1.length; i++) {
- final pos = insertData.findMap(map1[i]);
- if (pos > -1) {
- maintainIndices.add(i); // Item exists in both lists
- insertData.removeAt(pos);
- } else {
- removeIndices.add(i); // Item does not exist in map2
- }
- }
- return (
- maintainIndices: maintainIndices,
- removeIndices: removeIndices,
- insertData: insertData
- );
- }
- List<List<T>> chunkList<T>(List<T> list, int chunkSize) {
- if (chunkSize <= 0) {
- throw ArgumentError('chunkSize must be greater than 0');
- }
- List<List<T>> chunks = [];
- for (var i = 0; i < list.length; i += chunkSize) {
- chunks.add(list.sublist(
- i, i + chunkSize > list.length ? list.length : i + chunkSize));
- }
- return chunks;
- }
- }
- dynamic _parseDate(String date) {
- final parts = date.split('/');
- if (parts.length == 3) {
- return DateTime(
- int.parse(parts[2]), // year
- int.parse(parts[1]), // month
- int.parse(parts[0]), // day
- );
- } else {
- return date;
- }
- }
- extension CompareIterables<T> on Iterable<T> {
- /// Compares this iterable with another iterable and returns a map containing:
- /// - 'added': Items that are in the other iterable but not in this one.
- /// - 'removed': Items that are in this iterable but not in the other one.
- (Iterable<T> add, Iterable<T> remove) compareWith(Iterable<T> other) {
- final Set<T> thisSet = this.toSet();
- final Set<T> otherSet = other.toSet();
- final Set<T> removed = otherSet.difference(thisSet);
- final Set<T> added = thisSet.difference(otherSet);
- return (added, removed);
- }
- }
- extension FilterMapByKeys<K, V> on Map<K, V> {
- Map<K, V?> filterKeys(List<K> keysToKeep) {
- final filteredMap = <K, V?>{};
- for (final key in keysToKeep) {
- if (this.containsKey(key)) {
- filteredMap[key] = this[key];
- }
- }
- return filteredMap;
- }
- }
- extension RemoveNull<T> on Iterable<T?> {
- /// Returns a new iterable with all null values removed.
- Iterable<T> removeNull() {
- return where((element) => element != null).cast<T>();
- }
- }
- extension GroupBy<T> on Iterable<T> {
- Map<K, List> groupBy<K>(K Function(T) keyFunction,
- {Function(T)? dataFunction, bool Function(T)? keyIsNullFunction}) {
- final map = <K, List>{};
- for (final element in this) {
- final key = keyFunction(element);
- final keyIsNull =
- keyIsNullFunction == null ? false : keyIsNullFunction(element);
- if (keyIsNull || key == null) continue;
- if (dataFunction != null) {
- map.putIfAbsent(key, () => []).add(dataFunction(element));
- } else
- map.putIfAbsent(key, () => []).add(element);
- }
- return map;
- }
- }
- extension NullIfEmpty on String {
- String? get nullIfEmpty => isEmpty ? null : this;
- }
- extension RemoveQuotes on String {
- String get removeQuotes {
- if (isEmpty) return this; // Return if the string is empty
- // Remove the first and last characters if they are quotes
- String result = this;
- // Check if the first character is a quote
- bool startsWithQuote = result.startsWith('"') || result.startsWith("'");
- if (startsWithQuote) result = result.substring(1);
- // Check if the last character is a quote
- bool endsWithQuote = result.endsWith('"') || result.endsWith("'");
- if (endsWithQuote) result = result.substring(0, result.length - 1);
- return result;
- }
- }
- bool mapsAreEqual(Map<String, dynamic> map1, Map<String, dynamic> map2) {
- if (map1.length != map2.length) return false;
- for (var key in map1.keys) {
- if (map1[key] != map2[key]) return false;
- }
- return true;
- }
- extension ContainsMap on List<Map<String, dynamic>> {
- bool containsMap(Map<String, dynamic> map) {
- for (var item in this) {
- if (mapsAreEqual(item, map)) return true;
- }
- return false;
- }
- int findMap(Map<String, dynamic> map) {
- for (int i = 0; i < this.length; i++) {
- if (mapsAreEqual(this.elementAt(i), map)) return i;
- }
- return -1;
- }
- }
|