import 'dart:io'; import 'package:file_upload_processor/handlers/base_api.dart'; import 'package:intl/intl.dart'; import 'package:mime/mime.dart'; import 'package:shelf/shelf.dart' as shelf; import 'package:path/path.dart' as path; import 'package:http_parser/http_parser.dart'; import 'package:archive/archive.dart'; import 'package:supabase/supabase.dart'; class FileUploadApi extends BaseApi { FileUploadApi(shelf.Request request) : super(request); final uploadFolder = "./uploaded"; String get rawFolder => "$uploadFolder/raw"; String get dataFolder => "$uploadFolder/data"; String workingFolder = ""; String get zipFolder => "$rawFolder/$workingFolder"; String get extFolder => "$dataFolder/$workingFolder"; SupabaseClient getSupabaseClient(shelf.Request request) { final supabaseUrl = request.headers['supabase-url']; if (supabaseUrl == null) { throw Exception('Supabase URL not provided in headers'); } final authHeader = request.headers['authorization']; if (authHeader == null || !authHeader.startsWith('Bearer ')) { throw Exception('Invalid or missing Authorization Bearer token'); } final token = authHeader.substring(7); // Remove 'Bearer ' prefix return SupabaseClient( supabaseUrl, token, ); } String getTimestampedFilename(String originalFilename) { final timestamp = DateFormat('yyyyMMdd_HHmmss').format(DateTime.now()); return '${timestamp}_$originalFilename'; } Future uploadToSupabase( String filePath, String filename, SupabaseClient supabaseClient, {bool timestamped = false, required String bucket, bool upsert = true}) async { try { final file = File(filePath); final timestampedFilename = timestamped ? getTimestampedFilename(filename) : filename; await supabaseClient.storage.from(bucket).upload( timestampedFilename, file, fileOptions: FileOptions( cacheControl: '3600', upsert: upsert, ), ); print('+File uploaded to <$bucket>: $timestampedFilename'); } catch (e) { print('!Error uploading to Supabase: $e'); rethrow; } } Future initializeDirectories() async { final directories = [ Directory(rawFolder), Directory(dataFolder), Directory(zipFolder), Directory(extFolder), ]; for (var dir in directories) { if (!await dir.exists()) { await dir.create(recursive: true); } } } bool isZipFile(List bytes) { if (bytes.length < 4) return false; return bytes[0] == 0x50 && bytes[1] == 0x4B && bytes[2] == 0x03 && bytes[3] == 0x04; } Future> processZipFile(String filePath) async { List files = []; final bytes = await File(filePath).readAsBytes(); final archive = ZipDecoder().decodeBytes(bytes); for (final file in archive) { final filename = file.name; if (file.isFile) { final data = file.content as List; final outFile = File(path.join(extFolder, filename)); await outFile.parent.create(recursive: true); await outFile.writeAsBytes(data); files.add(path.join(extFolder, filename)); } } return files; } @override response() async { workingFolder = DateTime.now().millisecondsSinceEpoch.toString(); final supabaseClient = getSupabaseClient(request); await initializeDirectories(); final contentType = request.headers['content-type']; if (contentType == null || !contentType.toLowerCase().startsWith('multipart/form-data')) { return shelf.Response.badRequest( body: 'Content-Type must be multipart/form-data'); } try { final mediaType = MediaType.parse(contentType); final boundary = mediaType.parameters['boundary']; if (boundary == null) { return shelf.Response.badRequest(body: 'Boundary not found'); } final transformer = MimeMultipartTransformer(boundary); final bodyBytes = await request.read().expand((e) => e).toList(); final stream = Stream.fromIterable([bodyBytes]); final parts = await transformer.bind(stream).toList(); for (var part in parts) { final contentDisposition = part.headers['content-disposition']; if (contentDisposition == null) continue; final filenameMatch = RegExp(r'filename="([^"]*)"').firstMatch(contentDisposition); if (filenameMatch == null) continue; final filename = filenameMatch.group(1); if (filename == null) continue; final bytes = await part.fold>( [], (prev, element) => [...prev, ...element], ); final rawFilePath = path.join(zipFolder, filename); await File(rawFilePath).writeAsBytes(bytes); List files = []; if (isZipFile(bytes)) { files.addAll(await processZipFile(rawFilePath)); } else { final dataFilePath = path.join(extFolder, filename); await File(rawFilePath).copy(dataFilePath); files.add(dataFilePath); } bytes.clear(); //upload to supabase storage await uploadToSupabase(rawFilePath, filename, supabaseClient, bucket: 'csvhich', timestamped: false, upsert: true); //upload to supabase storage archive timestamped await uploadToSupabase(rawFilePath, filename, supabaseClient, bucket: 'csvhich_archive', timestamped: true, upsert: false); //insert data to supabase csvhichupdates await supabaseClient .from('csvhichupdates') .insert({'filename': filename}); for (var file in files) { final fileProcess = FileProcess(file, supabaseClient); await fileProcess.go(donttouchdb: true); } } return shelf.Response.ok('File processed and uploaded successfully'); } catch (e, stackTrace) { //print('Error: $e\n$stackTrace'); return shelf.Response.internalServerError( body: 'Error processing upload: $e'); } finally { supabaseClient.dispose(); await File(zipFolder).delete(recursive: true); await File(extFolder).delete(recursive: true); } } } class FileProcess { FileProcess(this.filepath, this.supabase); final String filepath; final SupabaseClient supabase; String get filename => filepath.replaceAll('\\', "/").split("/").last; final Map tables = { "secondprgtype.txt": "aclegs_csv", "ExportPGRGPNmois.txt": "pnlegs_csv", "exportPGRGPN.txt": "pnlegs_csv", "exportlicence.txt": "licences_csv", }; final Map> _headers = { "secondprgtype.txt": [ "leg_no", "fn_carrier", "fn_number", "fn_suffix", "day_of_origin", "ac_owner", "ac_subtype", "ac_version", "ac_registration", "dep_ap_actual", "dep_ap_sched", "dep_dt_est", "dep_sched_dt", "arr_ap_actual", "arr_ap_sched", "arr_dt_est", "arr_sched_dt", "slot_time_actual", "leg_type", "status", "employer_cockpit", "employer_cabin", "cycles", "delay_code_01", "delay_code_02", "delay_code_03", "delay_code_04", "delay_time_01", "delay_time_02", "delay_time_03", "delay_time_04", "subdelay_code_01", "subdelay_code_02", "subdelay_code_03", "subdelay_code_04", "pax_booked_c", "pax_booked_y", "pax_booked_trs_c", "pax_booked_trs_y", "pad_booked_c", "pad_booked_y", "offblock_dt_a", "airborne_dt_a", "landing_dt_a", "onblock_dt_a", "offblock_dt_f", "airborne_dt_f", "landing_dt_f", "onblock_dt_f", "offblock_dt_m", "airborne_dt_m", "landing_dt_m", "onblock_dt_m", "eet", ], "exportPGRGPN.txt": [ "date", "tlc", "actype", "al", "fnum", "ddep", "hdep", "ddes", "hdes", "dep", "des", "label", "type", ], "ExportPGRGPNmois.txt": [ "date", "tlc", "actype", "al", "fnum", "ddep", "hdep", "ddes", "hdes", "dep", "des", "label", "type", ], "exportlicence.txt": [ "tlc", "fname", "mname", "lname", "expire", "ac", "college", "base", ], }; final Map scopes = { "secondprgtype.txt": "day_of_origin", "exportPGRGPN.txt": "date", "ExportPGRGPNmois.txt": "date", "exportlicence.txt": "tlc", }; final Map> idToRemove = { "secondprgtype.txt": ["day_of_origin"], "exportPGRGPN.txt": ["date", "tlc"], "ExportPGRGPNmois.txt": ["date", "tlc"], "exportlicence.txt": ["tlc"], }; final Map>> trackers = { /* "secondprgtype.txt": { "table": "aclegs_log", "headers": [ "day_of_origin", "dep_sched_dt", "fn_carrier", "fn_number", "dep_ap_sched", "arr_ap_sched", // "dep_ap_actual", // "arr_ap_actual" ] }, */ "exportPGRGPN.txt": [ { "table": "pnlegs_log_roster", "groupby": ["date", "tlc"], "track": ["dep", "des", "al", "fnum", "label"] }, // { // "table": "pnlegs_log_duty", // "groupby": ["date", "dep", "des", "al", "fnum", "label"], // "track": ["tlc"] // }, // { // "table": "pnlegs_log_sched", // "groupby": ["date", "dep", "des", "al", "fnum", "label"], // "changes": ["hdep", "hdes"] // }, ], /* "ExportPGRGPNmois.txt": { "table": "pnlegs_log", "headers": ["tlc", "date", "dep", "des", "al", "fnum", "label"] }, "exportlicence.txt": { "table": "qualifs_log", "headers": ["tlc", "college", "ac", "base"] }, */ }; Future>> parseCsv() async { final headers = _headers[filename] ?? []; if (headers.isEmpty) { throw Exception('No headers found for file: $filename'); } // Initialize an empty list to hold the parsed data List> data = []; // Read the CSV file final file = File(filepath); final lines = await file.readAsLines(); // Iterate over each line in the CSV file for (int i = 0; i < lines.length; i++) { // Split the line into individual values final values = lines[i].split(','); if (values.length != headers.length) { // print('Skipping line $i: Incorrect number of values: line: $i'); continue; } // Create a map for the current row Map row = {}; // Assign each value to the corresponding header for (int j = 0; j < headers.length; j++) { row[headers[j]] = values[j].trim().removeQuotes.trim().nullIfEmpty; } // Add the row map to the data list data.add(row); } // Return the parsed data return data; } List get filesTomonitor => _headers.keys.toList(); Future go({bool donttouchdb = false}) async { if (!filesTomonitor.contains(filename)) return; final allmapsToInsert = await parseCsv(); final scopeName = scopes[filename] ?? ""; final scopesInNew = allmapsToInsert .fold({}, (t, e) => t..add(e[scopeName] ?? "")).toList(); for (var scopeInNew in scopesInNew) { final mapsToInsert = allmapsToInsert.where((e) => e[scopeName] == scopeInNew).toList(); List> oldIds = []; List> oldComparable = []; //load old data final res = await supabase .from(tables[filename]!) .select() .eq(scopeName, scopeInNew) .limit(300000); oldIds.addAll(res.map((e) => {"id": e["id"]})); oldComparable.addAll(res.map((e) => e..remove("id"))); final comparisonResult = compareLists(oldComparable, mapsToInsert); final indexToRemove = comparisonResult.removeIndices; final indexToMaintain = comparisonResult.maintainIndices; final dataToInsert = comparisonResult.insertData; try { if (!donttouchdb) for (var e in chunkList( indexToRemove.map((f) => oldIds[f]['id']).toList(), 100)) { await supabase .from(tables[filename]!) // Replace with your actual table name .delete() .inFilter('id', e); } // insering new data if (!donttouchdb) await supabase .from(tables[filename]!) // Replace with your actual table name .insert(dataToInsert); } catch (e, stackTrace) { print('Error: $e\n$stackTrace'); } print( " Scope:$scopeInNew insert:${dataToInsert.length} remove:${indexToRemove.length} maintain:${indexToMaintain.length}"); for (var tracker in trackers[filename] ?? []) { final table = tracker["table"]; final groupby = tracker["groupby"] ?? []; final track = tracker["track"] ?? []; final stateOld = oldComparable.groupBy( (e) => groupby.map((f) => e[f]).join("|"), dataFunction: (e) => e .filterKeys(track) .values .map((j) => j == null ? "" : j) .join("_")); final stateNew = dataToInsert.groupBy( (e) => groupby.map((f) => e[f]).join("|"), dataFunction: (e) => e .filterKeys(track) .values .map((j) => j == null ? "" : j) .join("_")); List logs = []; for (var key in (stateOld.keys.toList()..addAll(stateNew.keys)).toSet()) { final (add, remove) = (stateNew[key] ?? []).diff(stateOld[key] ?? []); if (add.isNotEmpty || remove.isNotEmpty) { logs.add("$key:\n +$add}\n -$remove\n"); } } print(" Tracker:$table"); print(" $logs"); } } } ({ List maintainIndices, List removeIndices, // List insertIndices List insertData }) compareLists( List> map1, List> map2) { List maintainIndices = []; List removeIndices = []; List> insertData = map2; // Find indices to maintain and remove in map1 for (int i = 0; i < map1.length; i++) { final pos = insertData.findMap(map1[i]); if (pos > -1) { maintainIndices.add(i); // Item exists in both lists insertData.removeAt(pos); } else { removeIndices.add(i); // Item does not exist in map2 } } return ( maintainIndices: maintainIndices, removeIndices: removeIndices, insertData: insertData ); } List> chunkList(List list, int chunkSize) { if (chunkSize <= 0) { throw ArgumentError('chunkSize must be greater than 0'); } List> chunks = []; for (var i = 0; i < list.length; i += chunkSize) { chunks.add(list.sublist( i, i + chunkSize > list.length ? list.length : i + chunkSize)); } return chunks; } } extension IterableDiff on Iterable { (Iterable add, Iterable remove) diff(Iterable listB) { // Convert listA to a list for easier indexing final listA = this; // Items to add are those in listB but not in listA final add = listB.where((item) => !listA.contains(item)); // Items to remove are those in listA but not in listB final remove = listA.where((item) => !listB.contains(item)); return (add, remove); } } extension CompareIterables on Iterable { /// Compares this iterable with another iterable and returns a map containing: /// - 'added': Items that are in the other iterable but not in this one. /// - 'removed': Items that are in this iterable but not in the other one. (Iterable add, Iterable remove) compareWith(Iterable other) { final Set thisSet = this.toSet(); final Set otherSet = other.toSet(); final Set added = otherSet.difference(thisSet); final Set removed = thisSet.difference(otherSet); return (added, removed); } } extension FilterMapByKeys on Map { /// Returns a new map containing only the keys (and their associated values) /// that are present in the [keysToKeep] list. Map filterKeys(List keysToKeep) { return Map.fromEntries( entries .where((entry) => keysToKeep.contains(entry.key)) .cast>(), ); } } extension RemoveNull on Iterable { /// Returns a new iterable with all null values removed. Iterable removeNull() { return where((element) => element != null).cast(); } } extension GroupBy on Iterable { Map groupBy(K Function(T) keyFunction, {Function(T)? dataFunction, bool Function(T)? keyIsNullFunction}) { final map = {}; for (final element in this) { final key = keyFunction(element); final keyIsNull = keyIsNullFunction == null ? false : keyIsNullFunction(element); if (keyIsNull || key == null) continue; if (dataFunction != null) { map.putIfAbsent(key, () => []).add(dataFunction(element)); } else map.putIfAbsent(key, () => []).add(element); } return map; } } extension NullIfEmpty on String { String? get nullIfEmpty => isEmpty ? null : this; } extension RemoveQuotes on String { String get removeQuotes { if (isEmpty) return this; // Return if the string is empty // Remove the first and last characters if they are quotes String result = this; // Check if the first character is a quote bool startsWithQuote = result.startsWith('"') || result.startsWith("'"); if (startsWithQuote) result = result.substring(1); // Check if the last character is a quote bool endsWithQuote = result.endsWith('"') || result.endsWith("'"); if (endsWithQuote) result = result.substring(0, result.length - 1); return result; } } bool mapsAreEqual(Map map1, Map map2) { if (map1.length != map2.length) return false; for (var key in map1.keys) { if (map1[key] != map2[key]) return false; } return true; } extension ContainsMap on List> { bool containsMap(Map map) { for (var item in this) { if (mapsAreEqual(item, map)) return true; } return false; } int findMap(Map map) { for (int i = 0; i < this.length; i++) { if (mapsAreEqual(this.elementAt(i), map)) return i; } return -1; } }